Syncing iCal/ICS feeds into Discourse

Just to update on my own setup:

My network at IONOS is not flaky, so I won’t be needing Change 2 (the retry/backoff logic).
The rest of the changes are still useful in my case.

Here’s the script I will be using (post #20 with changes 1, 3, 4 and 5 from #21 applied):

ics2disc.py
#!/usr/bin/env python3
# Sync ICS -> Discourse topics (create/update by UID)
# Preserves human-edited titles; never moves categories on update.
# Requirements: requests, python-dateutil, icalendar
import os, sys, argparse, re, logging, hashlib
from datetime import datetime, date, timedelta
from dateutil.tz import gettz
from icalendar import Calendar
from urllib.parse import urlparse
import requests

log = logging.getLogger("ics2disc")
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

# --- Config from environment ---
BASE = os.environ.get("DISCOURSE_BASE_URL", "").rstrip("/")
API_KEY = os.environ.get("DISCOURSE_API_KEY")
API_USER = os.environ.get("DISCOURSE_API_USERNAME", "system")
CATEGORY_ID = os.environ.get("DISCOURSE_CATEGORY_ID")  # numeric (string ok) - used on CREATE only
DEFAULT_TAGS = [t for t in os.environ.get("DISCOURSE_DEFAULT_TAGS", "").split(",") if t]
SITE_TZ = os.environ.get("SITE_TZ", "Europe/London")

# Prefer Meta-style env name; fall back to TAG_MAX_LEN; default 30.
TAG_MAX_LEN = int(os.environ.get("DISCOURSE_TAG_MAX_LEN", os.environ.get("TAG_MAX_LEN", "30")))

# --- HTTP helpers (Discourse API) ---
def _session():
    s = requests.Session()
    s.headers.update({
        "Api-Key": API_KEY,
        "Api-Username": API_USER,
        "Content-Type": "application/json"
    })
    return s

# --- Tag helpers (namespace + safe names + length handling) ---
_TAG_SAFE_RE = re.compile(r"[^a-z0-9\-]+")
_TAG_DASHES_RE = re.compile(r"-{2,}")

def _short_hash(text, n=10):
    return hashlib.sha1((text or "").encode("utf-8")).hexdigest()[:n]

def _sanitize_tag_base(s: str) -> str:
    """Lowercase, replace invalid chars with '-', squeeze dashes, trim."""
    s = (s or "").strip().lower()
    s = _TAG_SAFE_RE.sub("-", s)
    s = _TAG_DASHES_RE.sub("-", s).strip("-")
    return s or "event"

def _enforce_len_or_truncate(tag: str) -> str:
    """Truncate *safely* (used only for our internal UID tag)."""
    if len(tag) <= TAG_MAX_LEN:
        return tag
    # leave room for suffix "-hXXXXXXXXXX" (12 chars with dash + 10 hex)
    suffix = "-h" + _short_hash(tag, 10)
    keep = max(1, TAG_MAX_LEN - len(suffix))
    return (tag[:keep].rstrip("-") + suffix)[:TAG_MAX_LEN]

def _sanitize_user_tag_or_skip(tag: str):
    """
    For human/default tags: sanitize, then if still too long -> skip with warning.
    This matches the "don't silently mutate user tags" guidance.
    """
    st = _sanitize_tag_base(tag)
    if len(st) > TAG_MAX_LEN:
        log.warning(f"Skipping overlong tag (> {TAG_MAX_LEN}): {st}")
        return None
    return st

def _sanitize_tag_list_user(tags):
    out = []
    for t in (tags or []):
        st = _sanitize_user_tag_or_skip(t)
        if st:
            out.append(st)
    return out

def _derive_namespace(args, ics_source_kind, ics_source_value) -> str:
    """
    Namespace priority:
      1) --namespace (CLI)
      2) ICS_NAMESPACE (env)
      3) Derived from URL host+tail or local filename stem
    """
    if getattr(args, "namespace", None):
        return _enforce_len_or_truncate(_sanitize_tag_base(args.namespace))
    env_ns = os.environ.get("ICS_NAMESPACE")
    if env_ns:
        return _enforce_len_or_truncate(_sanitize_tag_base(env_ns))

    if ics_source_kind == "url":
        u = urlparse(ics_source_value)
        host = (u.netloc or "ics").replace(".", "-")
        path_bits = [p for p in (u.path or "").split("/") if p]
        tail = path_bits[-1] if path_bits else "feed"
        base = f"{host}-{tail}"
        return _enforce_len_or_truncate(_sanitize_tag_base(base))
    else:
        fname = os.path.basename(ics_source_value)
        stem = os.path.splitext(fname)[0] or "ics"
        return _enforce_len_or_truncate(_sanitize_tag_base(stem))

def _build_uid_tag(namespace: str, uid: str) -> str:
    # Per-feed namespace + hashed UID; enforce length on the final tag.
    base = f"{namespace}-uid-{_short_hash(uid, 10)}"
    base = _sanitize_tag_base(base)
    return _enforce_len_or_truncate(base)

# --- Time helpers ---
def _as_dt(value, site_tz):
    tz = gettz(site_tz)
    if isinstance(value, date) and not isinstance(value, datetime):
        return datetime(value.year, value.month, value.day, 0, 0, 0, tzinfo=tz)
    if isinstance(value, datetime):
        return value if value.tzinfo is not None else value.replace(tzinfo=tz)
    raise TypeError(f"Unsupported dt value type: {type(value)}")

def _is_all_day(vevent):
    dtstart_prop = vevent.get('dtstart')
    if not dtstart_prop:
        return False
    try:
        if getattr(dtstart_prop, 'params', {}).get('VALUE') == 'DATE':
            return True
    except Exception:
        pass
    val = vevent.decoded('dtstart', None)
    return isinstance(val, date) and not isinstance(val, datetime)

def _fmt_iso_z(dt):
    return dt.astimezone(gettz('UTC')).strftime("%Y-%m-%dT%H:%M:%SZ")

def _is_recurrence_master(vevent):
    # Skip master if it has RRULE but no specific RECURRENCE-ID (no expansion here).
    return bool(vevent.get('rrule')) and not vevent.get('recurrence-id')

# --- Body builder ([event] BBCode) ---
def build_body(vevent, site_tz, rsvp=False):
    title = str(vevent.get('summary', 'Untitled')).strip() or "Untitled"
    desc = str(vevent.get('description', '')).strip()
    url = str(vevent.get('url', '')).strip()
    location = str(vevent.get('location', '')).strip()

    allday = _is_all_day(vevent)
    dtstart_raw = vevent.decoded('dtstart')
    dtend_raw = vevent.decoded('dtend', None)

    start_dt = _as_dt(dtstart_raw, site_tz)
    if dtend_raw is None:
        dtend_raw = (start_dt + (timedelta(days=1) if allday else timedelta(hours=1)))
    end_dt = _as_dt(dtend_raw, site_tz)

    if allday:
        start_attr = start_dt.strftime("%Y-%m-%d")
        if (end_dt - start_dt) >= timedelta(days=1):
            end_attr = (end_dt - timedelta(days=1)).strftime("%Y-%m-%d")
        else:
            end_attr = start_attr
        event_open = f'[event status="{"public" if rsvp else "standalone"}" timezone="{site_tz}" start="{start_attr}" end="{end_attr}"'
    else:
        event_open = f'[event status="{"public" if rsvp else "standalone"}" timezone="{site_tz}" start="{_fmt_iso_z(start_dt)}" end="{_fmt_iso_z(end_dt)}"'
    if location:
        event_open += f' location="{location}"'
    if url:
        event_open += f' url="{url}"'
    event_open += ' minimal="true"]'

    lines = [event_open, title, '[/event]']
    if desc:
        lines += ["", "---", "", desc]
    body = "\n".join(lines).strip()
    return title, body

# --- Marker to preserve human title edits ---
MARKER_RE = re.compile(r'<!--\s*ics-sync:title="(.*?)"\s*-->')

def add_marker(body, auto_title):
    marker = f'\n\n<!-- ics-sync:title="{auto_title}" -->'
    return (body + marker).strip()

def strip_marker(text):
    return MARKER_RE.sub("", text or "").strip()

def extract_marker_title(text):
    m = MARKER_RE.search(text or "")
    return m.group(1) if m else None

# --- Discourse API helpers ---
def find_topic_by_uid_tag(s, uid_tag):
    r = s.get(f"{BASE}/tags/{uid_tag}.json")
    if r.status_code == 404:
        return None
    r.raise_for_status()
    data = r.json()
    topics = data.get("topic_list", {}).get("topics", [])
    if not topics:
        return None
    return topics[0]["id"]

def read_topic(s, topic_id):
    r = s.get(f"{BASE}/t/{topic_id}.json")
    r.raise_for_status()
    return r.json()

def create_topic(s, title, raw, category_id, tags):
    payload = {
        "title": title,
        "raw": raw,
        "category": int(category_id) if category_id else None,
        "tags": tags or []
    }
    r = s.post(f"{BASE}/posts.json", json=payload, timeout=30)
    r.raise_for_status()
    data = r.json()
    return data["topic_id"], data["id"]

def update_topic_title_tags(s, topic_id, title=None, tags=None):
    payload = {}
    if title is not None:
        payload["title"] = title
    if tags is not None:
        payload["tags"] = tags
    if not payload:
        return
    r = s.put(f"{BASE}/t/-/{topic_id}.json", json=payload)
    r.raise_for_status()

def update_first_post(s, post_id, new_raw, reason="ICS sync update"):
    r = s.put(f"{BASE}/posts/{post_id}.json", json={"raw": new_raw, "edit_reason": reason})
    r.raise_for_status()

# --- Per-event processing ---
def process_vevent(s, vevent, args, feed_namespace):
    uid = str(vevent.get('uid', '')).strip()
    if not uid:
        log.warning("Skipping event without UID")
        return

    if _is_recurrence_master(vevent):
        log.info(f"Skipping RRULE master (no expansion) UID={uid}")
        return

    uid_tag = _build_uid_tag(feed_namespace, uid)

    # Human/default tags: sanitize and SKIP if too long; then add UID tag.
    extra_tags = _sanitize_tag_list_user(args.tags or [])
    default_tags = _sanitize_tag_list_user(DEFAULT_TAGS or [])
    tags = default_tags + extra_tags + [uid_tag]

    # De-dupe and sort for deterministic order
    tags = sorted(set(tags))

    if args.future_only:
        now = datetime.now(gettz(SITE_TZ))
        dtstart = _as_dt(vevent.decoded('dtstart'), SITE_TZ)
        if dtstart < now - timedelta(hours=1):
            return

    auto_title, fresh_body_no_marker = build_body(vevent, SITE_TZ, rsvp=args.rsvp)
    fresh_body = add_marker(fresh_body_no_marker, auto_title)

    topic_id = find_topic_by_uid_tag(s, uid_tag)
    if topic_id is None:
        if args.dry_run:
            log.info(f"[DRY] CREATE: {auto_title}  tags={tags}")
            return
        log.info(f"Creating new topic for UID {uid} …")
        created_topic_id, first_post_id = create_topic(s, auto_title, fresh_body, CATEGORY_ID, tags)
        log.info(f"Created topic #{created_topic_id}")
        return

    topic = read_topic(s, topic_id)
    first_post = topic["post_stream"]["posts"][0]
    first_post_id = first_post["id"]
    old_raw = first_post["raw"]
    old_title_visible = topic["title"]
    old_marker_title = extract_marker_title(old_raw)

    old_raw_stripped = strip_marker(old_raw)
    need_post_update = (old_raw_stripped.strip() != fresh_body_no_marker.strip())

    can_update_title = (old_marker_title is not None and old_title_visible.strip() == old_marker_title.strip())
    need_title_update = (can_update_title and old_title_visible.strip() != auto_title.strip())

    old_tags = topic.get("tags", [])
    need_tags_update = (sorted(old_tags) != sorted(tags))

    if not (need_post_update or need_title_update or need_tags_update):
        log.info(f"No changes for UID {uid} (topic #{topic_id})")
        return

    if args.dry_run:
        what = []
        if need_post_update: what.append("post")
        if need_title_update: what.append("title")
        if need_tags_update: what.append("tags")
        log.info(f"[DRY] UPDATE ({', '.join(what)}): topic #{topic_id} -> {auto_title} tags={tags}")
        return

    log.info(f"Updating topic #{topic_id} for UID {uid} …")
    if need_post_update:
        update_first_post(s, first_post_id, fresh_body, reason="ICS sync update")
    if need_title_update or need_tags_update:
        update_topic_title_tags(
            s, topic_id,
            title=(auto_title if need_title_update else None),
            tags=(tags if need_tags_update else None)
        )
    log.info(f"Updated topic #{topic_id}")

# --- Main (category only used at CREATE, never on update) ---
def main():
    ap = argparse.ArgumentParser(
        description="Sync ICS feed into Discourse topics (create/update by UID)."
    )
    ap.add_argument("--ics-url", help="URL to ICS feed")
    ap.add_argument("--ics-file", help="Path to local .ics")
    ap.add_argument("--future-only", action="store_true", help="Only import future events")
    ap.add_argument("--rsvp", action="store_true", help="Use status=\"public\" instead of standalone")
    ap.add_argument("--dry-run", action="store_true", help="Print actions without calling the API")
    ap.add_argument("--skip-errors", action="store_true", help="Continue on event errors")
    ap.add_argument("--tags", help="Comma-separated extra tags to add", default="")
    ap.add_argument("--namespace", help="Namespace for UID tags (defaults to derived from feed URL or filename)")
    args = ap.parse_args()
    args.tags = [t.strip() for t in (args.tags.split(",") if args.tags else []) if t.strip()]

    for var in ("DISCOURSE_BASE_URL", "DISCOURSE_API_KEY", "DISCOURSE_API_USERNAME"):
        if not os.environ.get(var):
            log.error(f"Missing env: {var}")
            sys.exit(1)

    if not args.ics_url and not args.ics_file:
        log.error("Provide --ics-url or --ics-file")
        sys.exit(1)

    # Determine source and derive namespace accordingly
    if args.ics_url:
        ics_kind = "url"
        ics_value = args.ics_url
        feed_namespace = _derive_namespace(args, ics_kind, ics_value)
        # Simple urllib fetch (no retries), as requested
        import urllib.request
        log.info(f"Fetching ICS: {args.ics_url}")
        req = urllib.request.Request(args.ics_url, headers={"User-Agent": "ics2disc/1.0"})
        with urllib.request.urlopen(req, timeout=30) as resp:
            data = resp.read()
    else:
        ics_kind = "file"
        ics_value = args.ics_file
        feed_namespace = _derive_namespace(args, ics_kind, ics_value)
        with open(args.ics_file, "rb") as f:
            data = f.read()

    log.info(f"Using namespace: {feed_namespace}")
    cal = Calendar.from_ical(data)
    s = _session()
    for comp in cal.walk("VEVENT"):
        try:
            process_vevent(s, comp, args, feed_namespace)
        except Exception as e:
            if args.skip_errors:
                log.error(f"Error on event UID={comp.get('uid')}: {e}")
                continue
            raise

if __name__ == "__main__":
    main()

And here’s how I’ll run it every hour with cron:

0 * * * * /usr/bin/python3 /srv/ics2disc.py --ics-file /srv/calendar.ics --future-only

Note: --future-only is optional — it just avoids syncing past events.

Common cron schedules
Expression Meaning
*/15 * * * * Every 15 minutes
0 * * * * Every hour on the hour
0 6 * * * Once daily at 06:00
0 0 * * 0 Once a week, midnight Sunday
1 Like