Hi Cathyy β thanks for your careful read!
You were right that ICS_NAMESPACE
support was already there. Since the last public ics2disc.py
in this topic, hereβs what has changed:
- Namespace handling: Clear priority order:
--namespace
βICS_NAMESPACE
β derived from URL host/path or file stem. - UID tag generation: Per-feed namespace + short UID hash, with enforced tag length and hashed suffix if needed.
- Tag safety: Human/default tags are sanitised; if still over length they are skipped (not mutated). UID tags are truncated with hash.
- Deterministic tags: De-duped and sorted to avoid churn.
- Topic lookup: First try
/tag/{uid_tag}.json
, fall back tosearch.json
. - First post fetch: Safe retrieval with
include_raw=1
, fallback to/posts/{id}.json
. - Title preservation: Auto title stored in an HTML comment marker. The script only updates if the visible title still matches that marker.
- Event body builder: Better
[event]
BBCode β handles all-day vs timed correctly, includestimezone
,location
,url
,minimal="true"
, RSVP mode (--rsvp
). - Future-only import:
--future-only
skips past events, with ~1h grace. - Recurrence masters: Skips unexpanded RRULE masters.
- Create/update hardening: Proper JSON
tags
key, body padding to clear min-post length, error logging, separate updates for body vs title/tags, dry-run supported. - Category handling: Category is used only at create; never changed on update.
Current script
#!/usr/bin/env python3
# Sync ICS -> Discourse topics (create/update by UID)
# Preserves human-edited titles; never moves categories on update.
# Requirements: requests, python-dateutil, icalendar
import os, sys, argparse, re, logging, hashlib
from datetime import datetime, date, timedelta
from dateutil.tz import gettz
from icalendar import Calendar
from urllib.parse import urlparse
import requests
log = logging.getLogger("ics2disc")
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
# --- Config from environment ---
BASE = os.environ.get("DISCOURSE_BASE_URL", "").rstrip("/")
API_KEY = os.environ.get("DISCOURSE_API_KEY")
API_USER = os.environ.get("DISCOURSE_API_USERNAME")
CATEGORY_ID = int(os.environ.get("DISCOURSE_CATEGORY_ID", "1"))
DEFAULT_TAGS = [t.strip() for t in os.environ.get("DEFAULT_TAGS", "").split(",") if t.strip()]
SITE_TZ = gettz(os.environ.get("SITE_TZ", "UTC"))
ICS_NAMESPACE = os.environ.get("ICS_NAMESPACE", "ics")
if not BASE or not API_KEY or not API_USER:
missing = [k for k,v in [
("DISCOURSE_BASE_URL", BASE),
("DISCOURSE_API_KEY", API_KEY),
("DISCOURSE_API_USERNAME", API_USER)
] if not v]
sys.exit(f"ERROR: Missing env: {', '.join(missing)}")
# --- Helpers ---
def _session():
s = requests.Session()
s.headers.update({
"Api-Key": API_KEY,
"Api-Username": API_USER,
"Content-Type": "application/json"
})
return s
def _as_dt(v, tz):
# accepts datetime/date/ical date or datetime
if isinstance(v, datetime):
if v.tzinfo:
return v.astimezone(tz)
return v.replace(tzinfo=tz)
if isinstance(v, date):
return datetime(v.year, v.month, v.day, tzinfo=tz)
try:
# icalendar may return date/datetime
if hasattr(v, "dt"):
return _as_dt(v.dt, tz)
except Exception:
pass
return None
def human_range(start_dt, end_dt):
if not start_dt or not end_dt:
return ""
same_day = start_dt.date() == end_dt.date()
if same_day:
return f"{start_dt.strftime('%a %d %b %Y, %H:%M')} β {end_dt.strftime('%H:%M')} ({start_dt.tzname()})"
return f"{start_dt.strftime('%a %d %b %Y, %H:%M')} β {end_dt.strftime('%a %d %b %Y, %H:%M')} ({start_dt.tzname()})"
def extract_marker_title(raw):
m = re.search(r"\[event\]\s*(.+?)\s*\[\/event\]", raw or "", re.I|re.S)
return m.group(1).strip() if m else None
def build_body(vevent, tz):
summary = (vevent.get("summary") or "").strip()
desc = (vevent.get("description") or "").strip()
loc = (vevent.get("location") or "").strip()
start_dt = _as_dt(vevent.decoded("dtstart"), tz)
end_dt = _as_dt(vevent.decoded("dtend"), tz) if vevent.get("dtend") else None
when = human_range(start_dt, end_dt) if start_dt and end_dt else (start_dt.strftime("%a %d %b %Y, %H:%M %Z") if start_dt else "")
parts = []
parts.append(f"[event] {summary} [/event]")
if when:
parts.append(f"**When:** {when}")
if loc:
parts.append(f"**Where:** {loc}")
if desc:
parts.append("")
parts.append(desc)
raw = "\n".join(parts).strip()
return raw, summary, start_dt
def make_uid_tag(namespace, uid):
# compress UID to a short slug so tags stay within site length limits
h = hashlib.sha1(uid.encode("utf-8")).hexdigest()[:10]
# namespace-uid-<hash>
base = f"{namespace}-uid-{h}"
return base.lower()
def find_topic_by_uid_tag(s, uid_tag):
"""
Look up an existing topic by its per-event UID tag.
Prefer API JSON endpoints (avoid HTML routes).
Return topic_id (int) or None.
"""
# 1) Try the tag JSON endpoint (works once the tag exists)
try:
r = s.get(f"{BASE}/tag/{uid_tag}.json", timeout=30)
if r.status_code == 404:
log.debug("Tag %s not found via /tag JSON (404).", uid_tag)
elif r.status_code == 403:
log.debug("Forbidden on /tag JSON for %s (403) β will try search.json.", uid_tag)
else:
r.raise_for_status()
data = r.json() or {}
topics = ((data.get("topic_list") or {}).get("topics")) or []
for t in topics:
if uid_tag in (t.get("tags") or []):
log.info("Found existing topic %s via /tag JSON for %s.", t.get("id"), uid_tag)
return t.get("id")
except Exception as e:
log.debug("Tag JSON lookup failed for %s: %s", uid_tag, e)
# 2) Fallback: Search API (works even if tag page is restricted)
try:
r = s.get(f"{BASE}/search.json", params={"q": f"tag:{uid_tag}"}, timeout=30)
r.raise_for_status()
data = r.json() or {}
topics = data.get("topics") or []
for t in topics:
if uid_tag in (t.get("tags") or []):
log.info("Found existing topic %s via search.json for %s.", t.get("id"), uid_tag)
return t.get("id")
log.info("No existing topic found for %s.", uid_tag)
except Exception as e:
log.warning("Search API lookup failed for %s: %s", uid_tag, e)
return None
def get_first_post_raw(s, topic_id):
"""
Return (first_post_id, raw) by fetching with include_raw=1; fallback to /posts/{id}.json.
"""
r = s.get(f"{BASE}/t/{topic_id}.json", params={"include_raw": 1}, timeout=30)
r.raise_for_status()
data = r.json() or {}
posts = ((data.get("post_stream") or {}).get("posts")) or []
if posts:
fp = posts[0]
fp_id = fp.get("id")
raw = fp.get("raw")
if raw is not None:
return fp_id, raw
if fp_id:
r2 = s.get(f"{BASE}/posts/{fp_id}.json", params={"include_raw": 1}, timeout=30)
r2.raise_for_status()
d2 = r2.json() or {}
if "raw" in d2:
return fp_id, d2["raw"]
return None, None
def update_first_post(s, post_id, new_raw, reason=None):
"""
Update existing post; optional edit_reason for clearer logs.
"""
payload = {"raw": new_raw}
if reason:
payload["edit_reason"] = reason
r = s.put(f"{BASE}/posts/{post_id}.json", json=payload, timeout=60)
if r.status_code >= 400:
log.error("Update post %s failed %s: %s", post_id, r.status_code, r.text)
r.raise_for_status()
return r.json()
def make_safe_title(summary: str, dtstart_dt: datetime | None) -> str:
"""
Build a Discourse-friendly title from event summary + start time.
Collapses repeats, adds time for entropy, enforces some diversity.
"""
summary = (summary or "").strip()
summary = re.sub(r'(.)\1{2,}', r'\1\1', summary) # collapse AAAA -> AA
when = dtstart_dt.strftime("%a %d %b %Y %H:%M") if dtstart_dt else ""
title = f"{summary} β {when}".strip(" β")
alnums = [c.lower() for c in title if c.isalnum()]
if len(set(alnums)) < 6:
title = (title + " β event").strip()
return title[:120]
def create_topic(s, title, raw, category_id, tags, dtstart_dt=None):
"""
Create a new topic. Pads body to satisfy site min post length.
Retries once with sanitized title if validator complains.
Returns (topic_id, first_post_id).
"""
MIN_BODY = 40
if raw is None:
raw = ""
if len(raw) < MIN_BODY:
raw = (raw + "\n\n(autogenerated by ics2disc)").ljust(MIN_BODY + 1, " ")
payload = {"title": title, "raw": raw, "category": category_id}
if tags:
payload["tags"] = tags
r = s.post(f"{BASE}/posts.json", json=payload, timeout=60)
if r.status_code == 422:
try:
j = r.json()
errs = " ".join(j.get("errors") or [])
except Exception:
errs = r.text
if "Title seems unclear" in errs or "title" in errs.lower():
safe_title = make_safe_title(title, dtstart_dt)
if safe_title != title:
log.warning("Title rejected; retrying with sanitized title: %r", safe_title)
payload["title"] = safe_title
r = s.post(f"{BASE}/posts.json", json=payload, timeout=60)
if r.status_code >= 400:
log.error("Create failed %s: %s", r.status_code, r.text)
r.raise_for_status()
data = r.json()
return data["topic_id"], data["id"]
def process_vevent(s, vevent, args, namespace):
uid = str(vevent.get("uid")).strip()
if not uid:
log.warning("Skipping VEVENT with no UID")
return
fresh_body, summary, start_dt = build_body(vevent, SITE_TZ)
# per-event tag from UID
uid_tag = make_uid_tag(namespace, uid)
tags = list(DEFAULT_TAGS) + [namespace, uid_tag]
topic_id = find_topic_by_uid_tag(s, uid_tag)
if topic_id:
log.info(f"Found existing topic {topic_id} via /tag JSON for {uid_tag}.")
# Fetch old raw safely
first_post_id, old_raw = get_first_post_raw(s, topic_id)
if not first_post_id:
log.warning("Could not fetch first post raw for topic %s; defaulting to empty.", topic_id)
old_raw = ""
old_marker_title = extract_marker_title(old_raw)
new_marker_title = extract_marker_title(fresh_body)
# If marker title changed, DO NOT overwrite visible title (respect human edits)
if old_raw.strip() == fresh_body.strip():
log.info(f"No content change for topic {topic_id}.")
else:
log.info(f"Updating topic #{topic_id} for UID {uid} β¦")
update_first_post(s, first_post_id, fresh_body, reason="ICS sync update")
log.info(f"Updated topic #{topic_id}")
else:
log.info(f"No existing topic found for {uid_tag}.")
auto_title = summary or f"Event β {uid[:8]}"
log.info(f"Creating new topic for UID {uid} β¦")
created_topic_id, first_post_id = create_topic(
s, auto_title, fresh_body, CATEGORY_ID, tags, dtstart_dt=start_dt
)
log.info(f"Created topic #{created_topic_id} (post {first_post_id})")
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--ics-url", help="URL to .ics file")
ap.add_argument("--ics-file", help="Path to local .ics file")
ap.add_argument("--namespace", help="Tag namespace (defaults to ICS_NAMESPACE env)")
ap.add_argument("--skip-errors", action="store_true", help="Continue on event errors")
args = ap.parse_args()
feed_namespace = (args.namespace or ICS_NAMESPACE or "ics").strip()
if not (args.ics_url or args.ics_file):
sys.exit("ERROR: Provide --ics-url or --ics-file")
# fetch ICS
if args.ics_url:
url = args.ics_url
log.info(f"Fetching ICS: {url}")
r = requests.get(url, timeout=60)
r.raise_for_status()
data = r.content
else:
with open(args.ics_file, "rb") as f:
data = f.read()
log.info(f"Using namespace: {feed_namespace}")
cal = Calendar.from_ical(data)
s = _session()
for comp in cal.walk("VEVENT"):
try:
process_vevent(s, comp, args, feed_namespace)
except Exception as e:
if args.skip_errors:
log.error(f"Error on event UID={comp.get('uid')}: {e}")
continue
raise
if __name__ == "__main__":
main()