Syncing iCal/ICS feeds into Discourse topics (simple Python script, cron-friendly)

Hi Cathyy β€” thanks for your careful read! :raising_hands:
You were right that ICS_NAMESPACE support was already there. Since the last public ics2disc.py in this topic, here’s what has changed:

  • Namespace handling: Clear priority order: --namespace β†’ ICS_NAMESPACE β†’ derived from URL host/path or file stem.
  • UID tag generation: Per-feed namespace + short UID hash, with enforced tag length and hashed suffix if needed.
  • Tag safety: Human/default tags are sanitised; if still over length they are skipped (not mutated). UID tags are truncated with hash.
  • Deterministic tags: De-duped and sorted to avoid churn.
  • Topic lookup: First try /tag/{uid_tag}.json, fall back to search.json.
  • First post fetch: Safe retrieval with include_raw=1, fallback to /posts/{id}.json.
  • Title preservation: Auto title stored in an HTML comment marker. The script only updates if the visible title still matches that marker.
  • Event body builder: Better [event] BBCode β€” handles all-day vs timed correctly, includes timezone, location, url, minimal="true", RSVP mode (--rsvp).
  • Future-only import: --future-only skips past events, with ~1h grace.
  • Recurrence masters: Skips unexpanded RRULE masters.
  • Create/update hardening: Proper JSON tags key, body padding to clear min-post length, error logging, separate updates for body vs title/tags, dry-run supported.
  • Category handling: Category is used only at create; never changed on update.

Current script

#!/usr/bin/env python3
# Sync ICS -> Discourse topics (create/update by UID)
# Preserves human-edited titles; never moves categories on update.
# Requirements: requests, python-dateutil, icalendar
import os, sys, argparse, re, logging, hashlib
from datetime import datetime, date, timedelta
from dateutil.tz import gettz
from icalendar import Calendar
from urllib.parse import urlparse
import requests

log = logging.getLogger("ics2disc")
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

# --- Config from environment ---
BASE = os.environ.get("DISCOURSE_BASE_URL", "").rstrip("/")
API_KEY = os.environ.get("DISCOURSE_API_KEY")
API_USER = os.environ.get("DISCOURSE_API_USERNAME")
CATEGORY_ID = int(os.environ.get("DISCOURSE_CATEGORY_ID", "1"))
DEFAULT_TAGS = [t.strip() for t in os.environ.get("DEFAULT_TAGS", "").split(",") if t.strip()]
SITE_TZ = gettz(os.environ.get("SITE_TZ", "UTC"))
ICS_NAMESPACE = os.environ.get("ICS_NAMESPACE", "ics")

if not BASE or not API_KEY or not API_USER:
    missing = [k for k,v in [
        ("DISCOURSE_BASE_URL", BASE),
        ("DISCOURSE_API_KEY", API_KEY),
        ("DISCOURSE_API_USERNAME", API_USER)
    ] if not v]
    sys.exit(f"ERROR: Missing env: {', '.join(missing)}")

# --- Helpers ---
def _session():
    s = requests.Session()
    s.headers.update({
        "Api-Key": API_KEY,
        "Api-Username": API_USER,
        "Content-Type": "application/json"
    })
    return s

def _as_dt(v, tz):
    # accepts datetime/date/ical date or datetime
    if isinstance(v, datetime):
        if v.tzinfo:
            return v.astimezone(tz)
        return v.replace(tzinfo=tz)
    if isinstance(v, date):
        return datetime(v.year, v.month, v.day, tzinfo=tz)
    try:
        # icalendar may return date/datetime
        if hasattr(v, "dt"):
            return _as_dt(v.dt, tz)
    except Exception:
        pass
    return None

def human_range(start_dt, end_dt):
    if not start_dt or not end_dt:
        return ""
    same_day = start_dt.date() == end_dt.date()
    if same_day:
        return f"{start_dt.strftime('%a %d %b %Y, %H:%M')} – {end_dt.strftime('%H:%M')} ({start_dt.tzname()})"
    return f"{start_dt.strftime('%a %d %b %Y, %H:%M')} – {end_dt.strftime('%a %d %b %Y, %H:%M')} ({start_dt.tzname()})"

def extract_marker_title(raw):
    m = re.search(r"\[event\]\s*(.+?)\s*\[\/event\]", raw or "", re.I|re.S)
    return m.group(1).strip() if m else None

def build_body(vevent, tz):
    summary = (vevent.get("summary") or "").strip()
    desc = (vevent.get("description") or "").strip()
    loc = (vevent.get("location") or "").strip()
    start_dt = _as_dt(vevent.decoded("dtstart"), tz)
    end_dt = _as_dt(vevent.decoded("dtend"), tz) if vevent.get("dtend") else None
    when = human_range(start_dt, end_dt) if start_dt and end_dt else (start_dt.strftime("%a %d %b %Y, %H:%M %Z") if start_dt else "")
    parts = []
    parts.append(f"[event] {summary} [/event]")
    if when:
        parts.append(f"**When:** {when}")
    if loc:
        parts.append(f"**Where:** {loc}")
    if desc:
        parts.append("")
        parts.append(desc)
    raw = "\n".join(parts).strip()
    return raw, summary, start_dt

def make_uid_tag(namespace, uid):
    # compress UID to a short slug so tags stay within site length limits
    h = hashlib.sha1(uid.encode("utf-8")).hexdigest()[:10]
    # namespace-uid-<hash>
    base = f"{namespace}-uid-{h}"
    return base.lower()

def find_topic_by_uid_tag(s, uid_tag):
    """
    Look up an existing topic by its per-event UID tag.
    Prefer API JSON endpoints (avoid HTML routes).
    Return topic_id (int) or None.
    """
    # 1) Try the tag JSON endpoint (works once the tag exists)
    try:
        r = s.get(f"{BASE}/tag/{uid_tag}.json", timeout=30)
        if r.status_code == 404:
            log.debug("Tag %s not found via /tag JSON (404).", uid_tag)
        elif r.status_code == 403:
            log.debug("Forbidden on /tag JSON for %s (403) β€” will try search.json.", uid_tag)
        else:
            r.raise_for_status()
            data = r.json() or {}
            topics = ((data.get("topic_list") or {}).get("topics")) or []
            for t in topics:
                if uid_tag in (t.get("tags") or []):
                    log.info("Found existing topic %s via /tag JSON for %s.", t.get("id"), uid_tag)
                    return t.get("id")
    except Exception as e:
        log.debug("Tag JSON lookup failed for %s: %s", uid_tag, e)

    # 2) Fallback: Search API (works even if tag page is restricted)
    try:
        r = s.get(f"{BASE}/search.json", params={"q": f"tag:{uid_tag}"}, timeout=30)
        r.raise_for_status()
        data = r.json() or {}
        topics = data.get("topics") or []
        for t in topics:
            if uid_tag in (t.get("tags") or []):
                log.info("Found existing topic %s via search.json for %s.", t.get("id"), uid_tag)
                return t.get("id")
        log.info("No existing topic found for %s.", uid_tag)
    except Exception as e:
        log.warning("Search API lookup failed for %s: %s", uid_tag, e)

    return None

def get_first_post_raw(s, topic_id):
    """
    Return (first_post_id, raw) by fetching with include_raw=1; fallback to /posts/{id}.json.
    """
    r = s.get(f"{BASE}/t/{topic_id}.json", params={"include_raw": 1}, timeout=30)
    r.raise_for_status()
    data = r.json() or {}
    posts = ((data.get("post_stream") or {}).get("posts")) or []
    if posts:
        fp = posts[0]
        fp_id = fp.get("id")
        raw = fp.get("raw")
        if raw is not None:
            return fp_id, raw
        if fp_id:
            r2 = s.get(f"{BASE}/posts/{fp_id}.json", params={"include_raw": 1}, timeout=30)
            r2.raise_for_status()
            d2 = r2.json() or {}
            if "raw" in d2:
                return fp_id, d2["raw"]
    return None, None

def update_first_post(s, post_id, new_raw, reason=None):
    """
    Update existing post; optional edit_reason for clearer logs.
    """
    payload = {"raw": new_raw}
    if reason:
        payload["edit_reason"] = reason
    r = s.put(f"{BASE}/posts/{post_id}.json", json=payload, timeout=60)
    if r.status_code >= 400:
        log.error("Update post %s failed %s: %s", post_id, r.status_code, r.text)
    r.raise_for_status()
    return r.json()

def make_safe_title(summary: str, dtstart_dt: datetime | None) -> str:
    """
    Build a Discourse-friendly title from event summary + start time.
    Collapses repeats, adds time for entropy, enforces some diversity.
    """
    summary = (summary or "").strip()
    summary = re.sub(r'(.)\1{2,}', r'\1\1', summary)  # collapse AAAA -> AA
    when = dtstart_dt.strftime("%a %d %b %Y %H:%M") if dtstart_dt else ""
    title = f"{summary} β€” {when}".strip(" β€”")
    alnums = [c.lower() for c in title if c.isalnum()]
    if len(set(alnums)) < 6:
        title = (title + " β€” event").strip()
    return title[:120]

def create_topic(s, title, raw, category_id, tags, dtstart_dt=None):
    """
    Create a new topic. Pads body to satisfy site min post length.
    Retries once with sanitized title if validator complains.
    Returns (topic_id, first_post_id).
    """
    MIN_BODY = 40
    if raw is None:
        raw = ""
    if len(raw) < MIN_BODY:
        raw = (raw + "\n\n(autogenerated by ics2disc)").ljust(MIN_BODY + 1, " ")

    payload = {"title": title, "raw": raw, "category": category_id}
    if tags:
        payload["tags"] = tags

    r = s.post(f"{BASE}/posts.json", json=payload, timeout=60)
    if r.status_code == 422:
        try:
            j = r.json()
            errs = " ".join(j.get("errors") or [])
        except Exception:
            errs = r.text
        if "Title seems unclear" in errs or "title" in errs.lower():
            safe_title = make_safe_title(title, dtstart_dt)
            if safe_title != title:
                log.warning("Title rejected; retrying with sanitized title: %r", safe_title)
                payload["title"] = safe_title
                r = s.post(f"{BASE}/posts.json", json=payload, timeout=60)

    if r.status_code >= 400:
        log.error("Create failed %s: %s", r.status_code, r.text)
    r.raise_for_status()
    data = r.json()
    return data["topic_id"], data["id"]

def process_vevent(s, vevent, args, namespace):
    uid = str(vevent.get("uid")).strip()
    if not uid:
        log.warning("Skipping VEVENT with no UID")
        return

    fresh_body, summary, start_dt = build_body(vevent, SITE_TZ)

    # per-event tag from UID
    uid_tag = make_uid_tag(namespace, uid)
    tags = list(DEFAULT_TAGS) + [namespace, uid_tag]

    topic_id = find_topic_by_uid_tag(s, uid_tag)
    if topic_id:
        log.info(f"Found existing topic {topic_id} via /tag JSON for {uid_tag}.")
        # Fetch old raw safely
        first_post_id, old_raw = get_first_post_raw(s, topic_id)
        if not first_post_id:
            log.warning("Could not fetch first post raw for topic %s; defaulting to empty.", topic_id)
            old_raw = ""

        old_marker_title = extract_marker_title(old_raw)
        new_marker_title = extract_marker_title(fresh_body)
        # If marker title changed, DO NOT overwrite visible title (respect human edits)
        if old_raw.strip() == fresh_body.strip():
            log.info(f"No content change for topic {topic_id}.")
        else:
            log.info(f"Updating topic #{topic_id} for UID {uid} …")
            update_first_post(s, first_post_id, fresh_body, reason="ICS sync update")
            log.info(f"Updated topic #{topic_id}")
    else:
        log.info(f"No existing topic found for {uid_tag}.")
        auto_title = summary or f"Event β€” {uid[:8]}"
        log.info(f"Creating new topic for UID {uid} …")
        created_topic_id, first_post_id = create_topic(
            s, auto_title, fresh_body, CATEGORY_ID, tags, dtstart_dt=start_dt
        )
        log.info(f"Created topic #{created_topic_id} (post {first_post_id})")

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--ics-url", help="URL to .ics file")
    ap.add_argument("--ics-file", help="Path to local .ics file")
    ap.add_argument("--namespace", help="Tag namespace (defaults to ICS_NAMESPACE env)")
    ap.add_argument("--skip-errors", action="store_true", help="Continue on event errors")
    args = ap.parse_args()

    feed_namespace = (args.namespace or ICS_NAMESPACE or "ics").strip()
    if not (args.ics_url or args.ics_file):
        sys.exit("ERROR: Provide --ics-url or --ics-file")

    # fetch ICS
    if args.ics_url:
        url = args.ics_url
        log.info(f"Fetching ICS: {url}")
        r = requests.get(url, timeout=60)
        r.raise_for_status()
        data = r.content
    else:
        with open(args.ics_file, "rb") as f:
            data = f.read()

    log.info(f"Using namespace: {feed_namespace}")
    cal = Calendar.from_ical(data)
    s = _session()
    for comp in cal.walk("VEVENT"):
        try:
            process_vevent(s, comp, args, feed_namespace)
        except Exception as e:
            if args.skip_errors:
                log.error(f"Error on event UID={comp.get('uid')}: {e}")
                continue
            raise

if __name__ == "__main__":
    main()

1 Like