Just to update on my own setup:
My network at IONOS is not flaky, so I won’t be needing Change 2 (the retry/backoff logic).
The rest of the changes are still useful in my case.
Here’s the script I will be using (post #20 with changes 1, 3, 4 and 5 from #21 applied):
ics2disc.py
#!/usr/bin/env python3
# Sync ICS -> Discourse topics (create/update by UID)
# Preserves human-edited titles; never moves categories on update.
# Requirements: requests, python-dateutil, icalendar
import os, sys, argparse, re, logging, hashlib
from datetime import datetime, date, timedelta
from dateutil.tz import gettz
from icalendar import Calendar
from urllib.parse import urlparse
import requests
log = logging.getLogger("ics2disc")
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
# --- Config from environment ---
BASE = os.environ.get("DISCOURSE_BASE_URL", "").rstrip("/")
API_KEY = os.environ.get("DISCOURSE_API_KEY")
API_USER = os.environ.get("DISCOURSE_API_USERNAME", "system")
CATEGORY_ID = os.environ.get("DISCOURSE_CATEGORY_ID") # numeric (string ok) - used on CREATE only
DEFAULT_TAGS = [t for t in os.environ.get("DISCOURSE_DEFAULT_TAGS", "").split(",") if t]
SITE_TZ = os.environ.get("SITE_TZ", "Europe/London")
# Prefer Meta-style env name; fall back to TAG_MAX_LEN; default 30.
TAG_MAX_LEN = int(os.environ.get("DISCOURSE_TAG_MAX_LEN", os.environ.get("TAG_MAX_LEN", "30")))
# --- HTTP helpers (Discourse API) ---
def _session():
s = requests.Session()
s.headers.update({
"Api-Key": API_KEY,
"Api-Username": API_USER,
"Content-Type": "application/json"
})
return s
# --- Tag helpers (namespace + safe names + length handling) ---
_TAG_SAFE_RE = re.compile(r"[^a-z0-9\-]+")
_TAG_DASHES_RE = re.compile(r"-{2,}")
def _short_hash(text, n=10):
return hashlib.sha1((text or "").encode("utf-8")).hexdigest()[:n]
def _sanitize_tag_base(s: str) -> str:
"""Lowercase, replace invalid chars with '-', squeeze dashes, trim."""
s = (s or "").strip().lower()
s = _TAG_SAFE_RE.sub("-", s)
s = _TAG_DASHES_RE.sub("-", s).strip("-")
return s or "event"
def _enforce_len_or_truncate(tag: str) -> str:
"""Truncate *safely* (used only for our internal UID tag)."""
if len(tag) <= TAG_MAX_LEN:
return tag
# leave room for suffix "-hXXXXXXXXXX" (12 chars with dash + 10 hex)
suffix = "-h" + _short_hash(tag, 10)
keep = max(1, TAG_MAX_LEN - len(suffix))
return (tag[:keep].rstrip("-") + suffix)[:TAG_MAX_LEN]
def _sanitize_user_tag_or_skip(tag: str):
"""
For human/default tags: sanitize, then if still too long -> skip with warning.
This matches the "don't silently mutate user tags" guidance.
"""
st = _sanitize_tag_base(tag)
if len(st) > TAG_MAX_LEN:
log.warning(f"Skipping overlong tag (> {TAG_MAX_LEN}): {st}")
return None
return st
def _sanitize_tag_list_user(tags):
out = []
for t in (tags or []):
st = _sanitize_user_tag_or_skip(t)
if st:
out.append(st)
return out
def _derive_namespace(args, ics_source_kind, ics_source_value) -> str:
"""
Namespace priority:
1) --namespace (CLI)
2) ICS_NAMESPACE (env)
3) Derived from URL host+tail or local filename stem
"""
if getattr(args, "namespace", None):
return _enforce_len_or_truncate(_sanitize_tag_base(args.namespace))
env_ns = os.environ.get("ICS_NAMESPACE")
if env_ns:
return _enforce_len_or_truncate(_sanitize_tag_base(env_ns))
if ics_source_kind == "url":
u = urlparse(ics_source_value)
host = (u.netloc or "ics").replace(".", "-")
path_bits = [p for p in (u.path or "").split("/") if p]
tail = path_bits[-1] if path_bits else "feed"
base = f"{host}-{tail}"
return _enforce_len_or_truncate(_sanitize_tag_base(base))
else:
fname = os.path.basename(ics_source_value)
stem = os.path.splitext(fname)[0] or "ics"
return _enforce_len_or_truncate(_sanitize_tag_base(stem))
def _build_uid_tag(namespace: str, uid: str) -> str:
# Per-feed namespace + hashed UID; enforce length on the final tag.
base = f"{namespace}-uid-{_short_hash(uid, 10)}"
base = _sanitize_tag_base(base)
return _enforce_len_or_truncate(base)
# --- Time helpers ---
def _as_dt(value, site_tz):
tz = gettz(site_tz)
if isinstance(value, date) and not isinstance(value, datetime):
return datetime(value.year, value.month, value.day, 0, 0, 0, tzinfo=tz)
if isinstance(value, datetime):
return value if value.tzinfo is not None else value.replace(tzinfo=tz)
raise TypeError(f"Unsupported dt value type: {type(value)}")
def _is_all_day(vevent):
dtstart_prop = vevent.get('dtstart')
if not dtstart_prop:
return False
try:
if getattr(dtstart_prop, 'params', {}).get('VALUE') == 'DATE':
return True
except Exception:
pass
val = vevent.decoded('dtstart', None)
return isinstance(val, date) and not isinstance(val, datetime)
def _fmt_iso_z(dt):
return dt.astimezone(gettz('UTC')).strftime("%Y-%m-%dT%H:%M:%SZ")
def _is_recurrence_master(vevent):
# Skip master if it has RRULE but no specific RECURRENCE-ID (no expansion here).
return bool(vevent.get('rrule')) and not vevent.get('recurrence-id')
# --- Body builder ([event] BBCode) ---
def build_body(vevent, site_tz, rsvp=False):
title = str(vevent.get('summary', 'Untitled')).strip() or "Untitled"
desc = str(vevent.get('description', '')).strip()
url = str(vevent.get('url', '')).strip()
location = str(vevent.get('location', '')).strip()
allday = _is_all_day(vevent)
dtstart_raw = vevent.decoded('dtstart')
dtend_raw = vevent.decoded('dtend', None)
start_dt = _as_dt(dtstart_raw, site_tz)
if dtend_raw is None:
dtend_raw = (start_dt + (timedelta(days=1) if allday else timedelta(hours=1)))
end_dt = _as_dt(dtend_raw, site_tz)
if allday:
start_attr = start_dt.strftime("%Y-%m-%d")
if (end_dt - start_dt) >= timedelta(days=1):
end_attr = (end_dt - timedelta(days=1)).strftime("%Y-%m-%d")
else:
end_attr = start_attr
event_open = f'[event status="{"public" if rsvp else "standalone"}" timezone="{site_tz}" start="{start_attr}" end="{end_attr}"'
else:
event_open = f'[event status="{"public" if rsvp else "standalone"}" timezone="{site_tz}" start="{_fmt_iso_z(start_dt)}" end="{_fmt_iso_z(end_dt)}"'
if location:
event_open += f' location="{location}"'
if url:
event_open += f' url="{url}"'
event_open += ' minimal="true"]'
lines = [event_open, title, '[/event]']
if desc:
lines += ["", "---", "", desc]
body = "\n".join(lines).strip()
return title, body
# --- Marker to preserve human title edits ---
MARKER_RE = re.compile(r'<!--\s*ics-sync:title="(.*?)"\s*-->')
def add_marker(body, auto_title):
marker = f'\n\n<!-- ics-sync:title="{auto_title}" -->'
return (body + marker).strip()
def strip_marker(text):
return MARKER_RE.sub("", text or "").strip()
def extract_marker_title(text):
m = MARKER_RE.search(text or "")
return m.group(1) if m else None
# --- Discourse API helpers ---
def find_topic_by_uid_tag(s, uid_tag):
r = s.get(f"{BASE}/tags/{uid_tag}.json")
if r.status_code == 404:
return None
r.raise_for_status()
data = r.json()
topics = data.get("topic_list", {}).get("topics", [])
if not topics:
return None
return topics[0]["id"]
def read_topic(s, topic_id):
r = s.get(f"{BASE}/t/{topic_id}.json")
r.raise_for_status()
return r.json()
def create_topic(s, title, raw, category_id, tags):
payload = {
"title": title,
"raw": raw,
"category": int(category_id) if category_id else None,
"tags": tags or []
}
r = s.post(f"{BASE}/posts.json", json=payload, timeout=30)
r.raise_for_status()
data = r.json()
return data["topic_id"], data["id"]
def update_topic_title_tags(s, topic_id, title=None, tags=None):
payload = {}
if title is not None:
payload["title"] = title
if tags is not None:
payload["tags"] = tags
if not payload:
return
r = s.put(f"{BASE}/t/-/{topic_id}.json", json=payload)
r.raise_for_status()
def update_first_post(s, post_id, new_raw, reason="ICS sync update"):
r = s.put(f"{BASE}/posts/{post_id}.json", json={"raw": new_raw, "edit_reason": reason})
r.raise_for_status()
# --- Per-event processing ---
def process_vevent(s, vevent, args, feed_namespace):
uid = str(vevent.get('uid', '')).strip()
if not uid:
log.warning("Skipping event without UID")
return
if _is_recurrence_master(vevent):
log.info(f"Skipping RRULE master (no expansion) UID={uid}")
return
uid_tag = _build_uid_tag(feed_namespace, uid)
# Human/default tags: sanitize and SKIP if too long; then add UID tag.
extra_tags = _sanitize_tag_list_user(args.tags or [])
default_tags = _sanitize_tag_list_user(DEFAULT_TAGS or [])
tags = default_tags + extra_tags + [uid_tag]
# De-dupe and sort for deterministic order
tags = sorted(set(tags))
if args.future_only:
now = datetime.now(gettz(SITE_TZ))
dtstart = _as_dt(vevent.decoded('dtstart'), SITE_TZ)
if dtstart < now - timedelta(hours=1):
return
auto_title, fresh_body_no_marker = build_body(vevent, SITE_TZ, rsvp=args.rsvp)
fresh_body = add_marker(fresh_body_no_marker, auto_title)
topic_id = find_topic_by_uid_tag(s, uid_tag)
if topic_id is None:
if args.dry_run:
log.info(f"[DRY] CREATE: {auto_title} tags={tags}")
return
log.info(f"Creating new topic for UID {uid} …")
created_topic_id, first_post_id = create_topic(s, auto_title, fresh_body, CATEGORY_ID, tags)
log.info(f"Created topic #{created_topic_id}")
return
topic = read_topic(s, topic_id)
first_post = topic["post_stream"]["posts"][0]
first_post_id = first_post["id"]
old_raw = first_post["raw"]
old_title_visible = topic["title"]
old_marker_title = extract_marker_title(old_raw)
old_raw_stripped = strip_marker(old_raw)
need_post_update = (old_raw_stripped.strip() != fresh_body_no_marker.strip())
can_update_title = (old_marker_title is not None and old_title_visible.strip() == old_marker_title.strip())
need_title_update = (can_update_title and old_title_visible.strip() != auto_title.strip())
old_tags = topic.get("tags", [])
need_tags_update = (sorted(old_tags) != sorted(tags))
if not (need_post_update or need_title_update or need_tags_update):
log.info(f"No changes for UID {uid} (topic #{topic_id})")
return
if args.dry_run:
what = []
if need_post_update: what.append("post")
if need_title_update: what.append("title")
if need_tags_update: what.append("tags")
log.info(f"[DRY] UPDATE ({', '.join(what)}): topic #{topic_id} -> {auto_title} tags={tags}")
return
log.info(f"Updating topic #{topic_id} for UID {uid} …")
if need_post_update:
update_first_post(s, first_post_id, fresh_body, reason="ICS sync update")
if need_title_update or need_tags_update:
update_topic_title_tags(
s, topic_id,
title=(auto_title if need_title_update else None),
tags=(tags if need_tags_update else None)
)
log.info(f"Updated topic #{topic_id}")
# --- Main (category only used at CREATE, never on update) ---
def main():
ap = argparse.ArgumentParser(
description="Sync ICS feed into Discourse topics (create/update by UID)."
)
ap.add_argument("--ics-url", help="URL to ICS feed")
ap.add_argument("--ics-file", help="Path to local .ics")
ap.add_argument("--future-only", action="store_true", help="Only import future events")
ap.add_argument("--rsvp", action="store_true", help="Use status=\"public\" instead of standalone")
ap.add_argument("--dry-run", action="store_true", help="Print actions without calling the API")
ap.add_argument("--skip-errors", action="store_true", help="Continue on event errors")
ap.add_argument("--tags", help="Comma-separated extra tags to add", default="")
ap.add_argument("--namespace", help="Namespace for UID tags (defaults to derived from feed URL or filename)")
args = ap.parse_args()
args.tags = [t.strip() for t in (args.tags.split(",") if args.tags else []) if t.strip()]
for var in ("DISCOURSE_BASE_URL", "DISCOURSE_API_KEY", "DISCOURSE_API_USERNAME"):
if not os.environ.get(var):
log.error(f"Missing env: {var}")
sys.exit(1)
if not args.ics_url and not args.ics_file:
log.error("Provide --ics-url or --ics-file")
sys.exit(1)
# Determine source and derive namespace accordingly
if args.ics_url:
ics_kind = "url"
ics_value = args.ics_url
feed_namespace = _derive_namespace(args, ics_kind, ics_value)
# Simple urllib fetch (no retries), as requested
import urllib.request
log.info(f"Fetching ICS: {args.ics_url}")
req = urllib.request.Request(args.ics_url, headers={"User-Agent": "ics2disc/1.0"})
with urllib.request.urlopen(req, timeout=30) as resp:
data = resp.read()
else:
ics_kind = "file"
ics_value = args.ics_file
feed_namespace = _derive_namespace(args, ics_kind, ics_value)
with open(args.ics_file, "rb") as f:
data = f.read()
log.info(f"Using namespace: {feed_namespace}")
cal = Calendar.from_ical(data)
s = _session()
for comp in cal.walk("VEVENT"):
try:
process_vevent(s, comp, args, feed_namespace)
except Exception as e:
if args.skip_errors:
log.error(f"Error on event UID={comp.get('uid')}: {e}")
continue
raise
if __name__ == "__main__":
main()
And here’s how I’ll run it every hour with cron:
0 * * * * /usr/bin/python3 /srv/ics2disc.py --ics-file /srv/calendar.ics --future-only
Note: --future-only is optional — it just avoids syncing past events.
Common cron schedules
Expression | Meaning |
---|---|
*/15 * * * * | Every 15 minutes |
0 * * * * | Every hour on the hour |
0 6 * * * | Once daily at 06:00 |
0 0 * * 0 | Once a week, midnight Sunday |