Getting Started

Sections

Theme switcher

Incremental Updates of Polygon Events

Goal: retrieve only the events that have changed (added, updated, or deleted) since the last run, without downloading the entire dataset each time.

Context: The AXSMarine API exposes two GraphQL endpoints:

  • polygonEvents – the active events.
  • deletedPolygonEvents – the events that have been removed.

1. Prerequisites

Step
Details
Resources

1ļøāƒ£

Access Token

Obtain your bearer token from AXSMarine and replace the placeholder INSERT MY TOKEN in the script.

2ļøāƒ£

Python 3.9+

The script uses requests and pandas.

3ļøāƒ£

Persistence

A local CSV cache is used; you can swap it for a database or cloud storage if desired.

4ļøāƒ£

Timestamp handling

lastUpdated is inclusive; subtract 24 h from the last modification time to avoid missing boundary events.


2. API Architecture

Plain text
# 1. polygonEvents
type Query {
polygonEvents(
first: Int,
last: Int,
before: String,
after: String,
isOpen: Boolean,
polygonIds: [Int],
polygonTypes: [String],
entryDate: RangeDate,
outDate: RangeDate,
entryDraft: RangeFloat,
entryHeading: RangeInt,
entrySpeed: RangeFloat,
outDraft: RangeFloat,
outHeading: RangeInt,
outSpeed: RangeFloat,
duration: RangeInt,
lastUpdated: RangeDate,
vesselBuilt: RangeDate,
vesselBeam: RangeFloat,
vesselLoa: RangeFloat,
vesselDraft: RangeFloat,
vesselDwt: RangeInt,
vesselTeu: RangeInt,
vesselCubic: RangeInt,
vesselIds: [Int],
vesselImos: [Int],
vesselSegments: [String],
vesselTypes: [String],
vesselSubtypes: [String],
vesselLinerServiceIds: [Int],
vesselLinerRegionIds: [Int]
): PolygonEventCursorConnection
# 2. deletedPolygonEvents
type Query {
deletedPolygonEvents(
first: Int,
last: Int,
before: String,
after: String,
polygonIds: [Int],
polygonTypes: [String],
deletedAt: RangeDate,
vesselIds: [Int],
vesselImos: [Int],
vesselSegments: [String],
vesselTypes: [String],
vesselSubtypes: [String]
): deletedPolygonEventCursorConnection
}

Pagination – Each response returns a pageInfo (endCursor, startCursor) and a list of edges. Use after to iterate over the entire set.


3. Workflow Overview

  1. Load the local cache (or initialize an empty dataframe).
  2. Determine the update window:
    1. If a file exists, the last run is its modification time minus 24 h.
    2. Otherwise, this is the first run, and we’ll request all events.
  3. Query polygonEvents with filters polygonTypes, entryDate, lastUpdated.
  4. Loop over pages until the number of records is < pageSize.
  5. Merge the new data into the dataframe and deduplicate on _id.
  6. Query deletedPolygonEvents for the same window (deletedAt).
  7. Remove rows that _id appear in the deletion list.
  8. Save the final DataFrame to a CSV file.

4. Complete Python Example

Tip: Store your token in a .env file or use environment variables; the script uses it in plain text for demonstration.

Python
#!/usr/bin/env python3 import os import logging import requests import pandas as pd from datetime import datetime, timedelta # -------------------------------------------------------------------- # 1ļøāƒ£ Configuration # ------------------------------------------------------------------ TOKEN = "INSERT MY TOKEN" # ← Replace with your token HEADERS = {"Authorization": f"Bearer {TOKEN}"} BASE_URL = "https://apihub.axsmarine.com/global/events/v1" # Example: only retrieve "shipyard" events. POLYGON_TYPES = ["shipyard"] ENTRY_START_DATE = "2025-01-01" # ISO‑8601 CSV_FILE = "2025_shipyard_events.csv" # ------------------------------------------------------------------ # 2ļøāƒ£ Load cache (if present) # ------------------------------------------------------------------ last_execution_time: str | None = None data = pd.DataFrame() if os.path.exists(CSV_FILE): # Ensure we don't miss events updated the same day. last_exec_ts = datetime.fromtimestamp(os.path.getmtime(CSV_FILE)) - timedelta(days=1) last_execution_time = last_exec_ts.strftime("%Y-%m-%d") data = pd.read_csv(CSV_FILE) # ------------------------------------------------------------------ # 3ļøāƒ£ Generic GraphQL query helper # ------------------------------------------------------------------ def gql_query(query: str, variables: dict) -> dict: """Execute a GraphQL request and return the JSON payload.""" logging.debug(f"Request variables: {variables}") resp = requests.post(BASE_URL, json={"query": query, "variables": variables}, headers=HEADERS) resp.raise_for_status() return resp.json()["data"] # ------------------------------------------------------------------ # 4ļøāƒ£ Retrieve active polygon events # ------------------------------------------------------------------ def fetch_polygon_events() -> pd.DataFrame: """Fetch active events, paginating until exhaustion.""" page_size = 5000 variables = { "pageSize": page_size, "polygonTypes": POLYGON_TYPES, "entryDate": {"from": ENTRY_START_DATE}, "lastUpdated": {"from": last_execution_time} if last_execution_time else None, } query = """ query polygonEvents( $pageSize: Int, $afterCursor: String, $polygonTypes: [String], $entryDate: RangeDate, $lastUpdated: RangeDate ) { polygonEvents( first: $pageSize, after: $afterCursor, polygonTypes: $polygonTypes, entryDate: $entryDate, lastUpdated: $lastUpdated ) { pageInfo { endCursor } edges { node { _id polygon { _id, name } vessel { _id, imo, name, type } entryAis { time } outAis { time } lastUpdated } } } } """ all_rows = [] cursor = None while True: variables["afterCursor"] = cursor payload = gql_query(query, variables) edges = payload["polygonEvents"]["edges"] rows = pd.json_normalize([e["node"] for e in edges]) all_rows.append(rows) if len(edges) < page_size: break cursor = payload["polygonEvents"]["pageInfo"]["endCursor"] if all_rows: df = pd.concat(all_rows, ignore_index=True) return df return pd.DataFrame() # ------------------------------------------------------------------ # 5ļøāƒ£ Remove deleted events # ------------------------------------------------------------------ def purge_deleted_events(df: pd.DataFrame) -> pd.DataFrame: """Delete rows that match events reported as deleted.""" if last_execution_time is None: # First run → nothing to purge return df page_size = 1000 variables = { "pageSize": page_size, "polygonTypes": POLYGON_TYPES, "deletedAt": {"from": last_execution_time}, } query = """ query deletedPolygonEvents( $pageSize: Int, $afterCursor: String, $polygonTypes: [String], $deletedAt: RangeDate ) { deletedPolygonEvents( first: $pageSize, after: $afterCursor, polygonTypes: $polygonTypes, deletedAt: $deletedAt ) { pageInfo { endCursor } edges { node { _id } } } } """ cursor = None deleted_ids = set() while True: variables["afterCursor"] = cursor payload = gql_query(query, variables) edges = payload["deletedPolygonEvents"]["edges"] deleted_ids.update(e["node"]["_id"] for e in edges) if len(edges) < page_size: break cursor = payload["deletedPolygonEvents"]["pageInfo"]["endCursor"] if deleted_ids: df = df[~df["_id"].isin(deleted_ids)].reset_index(drop=True) return df # ------------------------------------------------------------------ # 6ļøāƒ£ Main integration # ------------------------------------------------------------------ def main(data): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") # 6.1 Fetch new/updated events new_events = fetch_polygon_events() logging.info(f"Fetched {len(new_events)} new/updated events") # 6.2 Merge with existing cache if not data.empty: data = pd.concat([data, new_events], ignore_index=True) data = data.drop_duplicates(subset=["_id"], keep="last") else: data = new_events # 6.3 Remove events that were deleted data = purge_deleted_events(data) logging.info(f"Dataset after purge: {len(data)} records") # 6.4 Persist data.to_csv(CSV_FILE, index=False) logging.info(f"Persisted {CSV_FILE}") if __name__ == "__main__": main(data)

Implementation notes

  • pandas.json_normalize flattens the nested GraphQL structure into a tidy dataframe.
  • The lastUpdated filter is inclusive; subtracting 24 h from the file’s mtime guarantees that we don’t miss updates on the boundary day.
  • pageSize can be tuned to match network capacity and available memory.

5. Best Practices

Practice
Why

Secure token

Store it in a .env file or environment variable.

Limit pageSize

Prevent server/client time‑outs.

Logging

Enable debugging output (logging).

Error handling

Catch HTTP errors (requests.exceptions.HTTPError) and retry with exponential back‑off.

Unit tests

Mock API responses to validate merge and purge logic.

Monitoring

Run via Cron/Argo‑workflow and track record counts per run.


6. Advanced Use‑Cases

Scenario
Solution

Sync to SQL

Use SQLAlchemy to bulk‑load the dataframe.

Partitioning

Separate CSVs or tables by year/month.

Integrity check

Store a SHA-256 hash of the payload and verify it each time.


7. Conclusion

This approach lets you:

  1. Significantly cut down network traffic (only changed data).
  2. Keep your local cache fresh (add, update, delete).
  3. Easily automate the process with a clear, maintainable script.

By following this guide, you can integrate AXSMarine’s polygon events into your document‑management ecosystem while keeping bandwidth and storage usage minimal. šŸš€


References

On this page
  • Incremental Updates of Polygon Events