主题投票自动自投

:information_source: 摘要 当用户创建主题时启用自动主题投票
:hammer_and_wrench: 代码仓库 GitHub - dereklputnam/discourse-topic-voting-auto-self-vote
:question: 安装指南 如何安装主题或主题组件
:open_book: 对 Discourse 主题不熟悉? Discourse 主题使用入门指南

Install this theme component

如果您的社区投票数有限,比如在 Meta 上,那么让 OP(原帖作者)的投票自动生效可能不太合适(参见此处讨论)。但对于投票数不限的社区来说,这是一个痛点,用户甚至可能不会给自己创建的主题投票!因此,需要这个组件。

设置

  • 自动投票分类: 如果您不想在全站启用此功能,请在此处指定应适用的分类。
  • 排除群组: 如果您不希望此功能应用于特定群组(例如内部团队),请在此处添加它们。
  • 访问时自动投票: 一种温和的方式来清理那些 OP 尚未投票的现有主题。

填充遗漏的主题

识别主题

使用此数据探索器查询来识别 OP 尚未投票的主题:

-- [params]
-- null category_id :category_id
-- null string :category_slug

SELECT
  t.id AS topic_id,
  u.username AS "Username",
  t.title AS "Topic Title",
  t.user_id AS "Author",
  t.created_at AS "Created At",
  c.name AS "Category Name",
  c.slug AS "Category Slug"
FROM topics t
JOIN users u ON u.id = t.user_id
JOIN categories c ON c.id = t.category_id
JOIN topic_voting_topic_vote_count tvvc ON tvvc.topic_id = t.id
LEFT JOIN topic_voting_votes tvv ON tvv.topic_id = t.id AND tvv.user_id = t.user_id
WHERE tvv.id IS NULL
  AND t.deleted_at IS NULL
  AND (
    :category_id IS NULL OR c.id = :category_id
  )
  AND (
    :category_slug IS NULL OR c.slug = :category_slug
  )

提示: 在更新主题后重新运行此查询,以验证投票是否已投 :ballot_box:

通过 API 填充遗漏数据

然后,使用该列表并通过 API 脚本来填充遗漏的数据!要使用我下面的脚本,您需要删除前两列之外的所有列。 我将它们保留在数据探索器查询中,以便更容易地识别社区中的主题。

注意: 您的 API 密钥应作用于所有用户,因为它会模拟每个用户。

我没有权限附加我使用的脚本,但以下是文本内容:

Backfill script
#!/usr/bin/env python3
"""
Backfill Self-Votes via Discourse API

This script casts votes for topic authors who haven't voted on their own topics.
It uses the Discourse API to impersonate users and cast votes on their behalf.

Requirements:
- Python 3.7+
- requests library (pip install requests)
- An admin API key with impersonation permissions

Usage:
1. Update the configuration section below
2. Prepare a CSV file with topic_id and username columns
3. Run: python backfill_votes_api.py

CSV Format:
    topic_id,username
    12345,john_doe
    12346,jane_smith
"""

import csv
import time
import requests
from datetime import datetime

#==============================================================================
# CONFIGURATION
#==============================================================================

# Discourse instance URL (no trailing slash)
DISCOURSE_URL = 'https://community.netwrix.com'

# Admin API key (must have impersonation permissions)
API_KEY = 'YOUR_API_KEY_HERE'

# Admin username that owns the API key
API_USERNAME = 'system'

# Path to CSV file with topic_id and username columns
CSV_FILE = 'topics_to_vote.csv'

# Dry run mode - set to False to actually cast votes
DRY_RUN = True

# Delay between API calls (seconds) to avoid rate limiting
DELAY_BETWEEN_REQUESTS = 0.5

#==============================================================================
# SCRIPT
#==============================================================================

def cast_vote(topic_id: int, username: str) -> dict:
    """
    Cast a vote on a topic as a specific user.

    Args:
        topic_id: The ID of the topic to vote on
        username: The username to impersonate when voting

    Returns:
        dict with 'success' boolean and 'message' string
    """
    url = f"{DISCOURSE_URL}/voting/vote"

    headers = {
        'Api-Key': API_KEY,
        'Api-Username': username,  # Impersonate the user
        'Content-Type': 'application/json'
    }

    data = {
        'topic_id': topic_id
    }

    try:
        response = requests.post(url, headers=headers, json=data)

        if response.status_code == 200:
            return {'success': True, 'message': 'Vote cast successfully'}
        elif response.status_code == 422:
            # Usually means already voted or voting not enabled
            return {'success': False, 'message': 'Already voted or voting not enabled'}
        elif response.status_code == 403:
            return {'success': False, 'message': 'Permission denied - check API key permissions'}
        elif response.status_code == 404:
            return {'success': False, 'message': 'Topic not found or voting not enabled on category'}
        else:
            return {'success': False, 'message': f'HTTP {response.status_code}: {response.text[:200]}'}

    except requests.RequestException as e:
        return {'success': False, 'message': f'Request error: {str(e)}'}

def main():
    print("\n" + "=" * 60)
    print("Backfill Self-Votes via API")
    print("=" * 60)
    print(f"Mode: {'DRY RUN (no changes)' if DRY_RUN else 'LIVE (votes will be cast)'}")
    print(f"Target: {DISCOURSE_URL}")
    print(f"CSV File: {CSV_FILE}")
    print("=" * 60 + "\n")

    # Read CSV file
    try:
        with open(CSV_FILE, 'r', newline='', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            rows = list(reader)
    except FileNotFoundError:
        print(f"ERROR: CSV file not found: {CSV_FILE}")
        print("\nCreate a CSV file with the following format:")
        print("topic_id,username")
        print("12345,john_doe")
        print("12346,jane_smith")
        return
    except Exception as e:
        print(f"ERROR: Failed to read CSV file: {e}")
        return

    if not rows:
        print("ERROR: CSV file is empty")
        return

    # Validate CSV columns - support both 'username' and 'Author Username'
    columns = set(rows[0].keys())
    username_col = 'username' if 'username' in columns else 'Author Username'

    if 'topic_id' not in columns or username_col not in columns:
        print(f"ERROR: CSV must have columns: topic_id and (username or 'Author Username')")
        print(f"Found columns: {columns}")
        return

    print(f"Found {len(rows)} topics to process\n")

    # Process each row
    success_count = 0
    skip_count = 0
    error_count = 0

    for i, row in enumerate(rows, 1):
        topic_id = row['topic_id'].strip()
        username = row.get('username', row.get('Author Username', '')).strip()

        print(f"[{i}/{len(rows)}] Topic #{topic_id} by @{username}", end=" ")

        if DRY_RUN:
            print("-> would vote")
            success_count += 1
        else:
            result = cast_vote(int(topic_id), username)

            if result['success']:
                print("-> voted!")
                success_count += 1
            elif 'Already voted' in result['message']:
                print("-> already voted (skipped)")
                skip_count += 1
            else:
                print(f"-> ERROR: {result['message']}")
                error_count += 1

            # Rate limiting
            if i < len(rows):
                time.sleep(DELAY_BETWEEN_REQUESTS)

    # Summary
    print("\n" + "=" * 60)
    print("SUMMARY")
    print("=" * 60)
    print(f"Total topics: {len(rows)}")
    print(f"Votes {'to cast' if DRY_RUN else 'cast'}: {success_count}")
    print(f"Already voted (skipped): {skip_count}")
    print(f"Errors: {error_count}")

    if DRY_RUN:
        print("\n** DRY RUN COMPLETE **")
        print("To cast votes, set DRY_RUN = False and run again.")

    print("")


if __name__ == '__main__':
    main()
1 个赞