Vés al contingut

Usuari:CobainBot/scripts/cawiki/creators.py

De la Viquipèdia, l'enciclopèdia lliure
import pickle
import re
import sys
import time
from argparse import ArgumentParser
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from dateutil.relativedelta import relativedelta
from pathlib import Path
from requests.exceptions import ConnectionError
from typing import Dict, List

# pywikibot
from pywikibot import Page, Site
from pywikibot.data import api

"""
Verificador de qualitat de traduccions.

Es considera que un usuari ha creat un article quan a les seues contribucions consta l'article com a nou i no ha estat
per crear una redirecció.

L'article no és favorable si el contingut actual té les següents plantilles:

"""

# globals
site = Site('ca', 'wikipedia', 'CobainBot')
site.login()


class Template(Page):
    def __init__(self, page: Page):
        Page.__init__(self, page)
        self.name = page.title(with_ns=False, with_section=False, without_brackets=False)
        self.parameters = {}

    def set_parameters(self, parameters: List[str]):
        self.parameters = dict(param.split('=', 1) if '=' in param else (param, '') for param in parameters)

    def __repr__(self):
        return f'<Template "{self.name}" {self.parameters}>'


class Revision:
    def __init__(self):
        self.id = 0
        self.parent_id = 0
        self.size = 0
        self.content = ''
        self.username = ''
        self.user_id = 0
        self.comment = ''
        self.tags = []
        self.timestamp = None
        self.is_anon = False
        self.is_minor = False
        self.is_redir = False
        self.is_disambig = False
        self.is_ContentTranslate = False

    def set(self, rev_dict):
        self.id = rev_dict['revid']
        self.parent_id = rev_dict['parentid']
        self.size = rev_dict['size']
        self.content = rev_dict['slots']['main']['*'] if 'texthidden' not in rev_dict['slots']['main'] else ''
        self.user_id = rev_dict['userid']
        self.username = rev_dict['user']
        self.comment = rev_dict['comment']
        self.tags = rev_dict['tags']
        self.is_anon = rev_dict['anon']
        self.is_minor = rev_dict['minor']
        self.timestamp = rev_dict['timestamp']


class Article(Page):
    def __init__(self, title):
        Page.__init__(self, site, title)
        self.warning_templates = (
            "Falten referències", "FVA", "Millorar traducció", "Millorar bibliografia", "No s'entén", "Imprecís",
            "Millorar ortografia", "Expert", "Segona llegida", "Millorar introducció", "Prosa", "Millorar",
            "Millores múltiples", "Condensar", "Biaix de gènere", "Fonts primàries", "Error de gènere", "Currículum",
            "Massa vegeu també", "Millorar format", "Millorar estructura", "Duplicat", "Moure", "Moure a Viccionari",
            "Moure a Viquidites", "Moure a Viquillibres", "Moure a Viquinotícies", "Moure a Viquitexts",
            "Moure a Wikidata", "Recerca original"
        )
        self.warning2_templates = (
            "Cal citació", "Font qüestionable", "Imprecís", "Verifica la citació", "Format ref", "Tinv"
        )
        self.template_list: List[Template] = []
        self.found_templates: List[Template] = []
        self.found_countable_template: Dict[Template, int] = {}
        self.is_content_translate = False

    def set_templates(self):
        for template_page, parameters in self.templatesWithParams():
            template = Template(template_page)
            template.set_parameters(parameters)
            self.template_list.append(template)

    @property
    def first_edit(self):
        """
        corresponds to pywikibot.Page.oldest_revision but its data type is Revision.
        :return:
        """
        data = self.revisions(reverse=True, total=1, content=True)
        revision = Revision()
        revision.set(list(data)[0])
        return revision

    @property
    def last_edit(self):
        data = self.revisions(total=1, content=True)
        """
        corresponds to pywikibot.Page.latest_revision but its data type is Revision.
        :return: 
        """
        revision = Revision()
        revision.set(list(data)[0])
        return revision

    def add_template(self, template: Template):
        self.found_templates.append(template)

    def add_countable_template(self, template: Template):
        if template not in self.found_countable_template:
            self.found_countable_template[template] = 0
        self.found_countable_template[template] += 1


class Creator:
    def __init__(self):
        self.id = 0
        self.name = ''
        self.edit_count = 0
        self.groups: List[str] = []
        self.raw_articles = 0
        self.last_log_timestamp = None
        self.redirects = 0
        self.disambigs = 0
        self.creations = 0
        self.old_article_is_current_redirect = 0
        self.content_translate_articles = 0
        self.weak_articles = 0

    def __repr__(self):
        cr = self.creations + self.disambigs + self.redirects
        return f'<User name: "{self.name}" ec: {self.edit_count} cr: {cr}>'

    @property
    def creation_average(self):
        if not self.creations:
            return 0
        return self.content_translate_articles / self.creations

    @property
    def weakness_average(self):
        if not self.content_translate_articles:
            return 0
        return self.weak_articles / self.content_translate_articles

    def set(self, user_dict):
        """
        :param user_dict:
        :return:
        """
        self.id = user_dict['userid']
        self.name = user_dict['name']
        self.edit_count = user_dict['editcount']
        self.groups = user_dict['groups']

    def set_content_translate_articles(self, articles: List[Article]):
        content_translate_articles = [a for a in articles if a.is_content_translate]
        self.content_translate_articles = len(content_translate_articles)
        return content_translate_articles

    def set_weak_articles(self, content_translate_articles: List[Article]):
        self.weak_articles = len([a for a in content_translate_articles if a.found_templates])


class Promise:
    def __init__(self, message='', loops=0):
        self.message = message
        self.loops = loops


class ArticleCreators:
    def __init__(self):
        self.site = site
        self.page = Page(self.site, "Viquipèdia:Llista d'usuaris per creació d'articles")
        self.users: List[Creator] = []
        self.ready_users: List[Creator] = []
        self.start_time = datetime.now()
        self.ct_tags = ('contenttranslation', 'contenttranslation-v2')
        self.queries = 0
        self.start_time = datetime.now()
        self.end_time = None
        self.checked_users = 0
        self.threads = 0
        self.buffer_size = 0
        self.limit = 500
        self.retries = 0

    def query(self, params) -> dict:
        self.queries += 1
        try:
            qry = api.Request(site, parameters=params)
        except ConnectionError as err:
            self.retries += 1
            wait = self.threads * 10 + self.retries * 10
            print(f'ErrConn {err}, sleeping {wait}')
            print(f'params: {params}')
            self.backup()
            time.sleep(wait)
            if self.retries < 3:
                return self.query(params)
            else:
                raise ConnectionError('Unable to continue after 3 retries') 
        self.retries = 0
        return qry.submit()

    def check_wait(self, username):
        while self.buffer_size > self.threads * self.limit:
            et = relativedelta(datetime.now(), self.start_time)
            wait = self.threads * 2
            checkpoint = f"[{datetime.now():%H:%M:%S}] !!!! WAITING FOR {wait} sec. u: {username} " \
                         f"*** users: {self.checked_users} queries: {self.queries} threads: {self.threads} ready: " \
                         f"{len(self.ready_users)} buffer: {self.buffer_size} [{et.days}:{et.hours}:{et.minutes}:" \
                         f"{et.seconds}]"
            print(checkpoint)
            self.backup()
            time.sleep(wait)

    def get_created_article_list(self, username):
        params = {
            'action': 'query',
            'list': 'usercontribs',
            'ucuser': username,
            'ucshow': 'new',
            'ucprop': 'title',
            'ucnamespace': 0,
            'uclimit': self.limit  # 'max' = 5000
        }
        uccontinue = True
        while uccontinue:

            if isinstance(uccontinue, str):
                # notice = f"sleeping 5 sec for next {username}'s contribs query... [{self.queries}]"
                # self.notice(notice, 4)
                params['uccontinue'] = uccontinue
            self.check_wait(username)
            data = self.query(params)
            self.buffer_size += len(data['query']['usercontribs'])
            for article in data['query']['usercontribs']:
                self.buffer_size -= 1
                yield Article(article['title'])
            uccontinue = data['query-continue']['usercontribs']['uccontinue'] if "query-continue" in data else None

    def count_created_article(self, username):
        params = {
            'action': 'query',
            'list': 'usercontribs',
            'ucuser': username,
            'ucshow': 'new',
            'ucprop': 'title',
            'ucnamespace': 0,
            'uclimit': 'max'
        }
        articles = 0
        uccontinue = True
        while uccontinue:
            if isinstance(uccontinue, str):
                params['uccontinue'] = uccontinue
            data = self.query(params)
            for _ in data['query']['usercontribs']:
                articles += 1
            uccontinue = data['query-continue']['usercontribs']['uccontinue'] if "query-continue" in data else None
        return articles

    def _user_is_active(self, username):
        """
        https://ca.wikipedia.org/w/api.php?action=query&list=usercontribs&ucuser=Coet&ucprop=title|timestamp&uclimit=1
        :return:
        """
        params = {
            'action': 'query',
            'list': 'usercontribs',
            'ucuser': username,
            'ucprop': 'title|timestamp',
            'uclimit': 1
        }
        data = self.query(params)
        timestamp = data['query']['usercontribs'][0]['timestamp']
        last_contrib = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
        diff = relativedelta(last_contrib, datetime.now())
        tiny_time = diff.days > 0 or diff.hours > 0 or diff.minutes > 0 or diff.seconds > 0
        more_than_three_month = diff.years == 0 and diff.months == 3 and tiny_time
        if diff.years > 0 or (diff.years == 0 and diff.months > 3) or more_than_three_month:
            return False
        return True

    def _all_users(self):
        """
        Mètode recursiu per poder obtenir tots els usuaris i usuàries locals de la VP. Amb recursiu, s'entén el que
        tècnicament fa referència a iteratiu amb l'afegit que mentre hi queden elements el mètode no s'atura. És
        a dir, utilitze el terme en un sentit més col·loquial que tècnic, ja que recursiu voldria dir que el mètode
        es torna a cridar a ell mateix metre queden elements i no sé perquè m'enrotlle tant.
        https://ca.wikipedia.org/w/api.php?action=help&modules=query%2Ballusers
        """
        aufrom = True  # Per superar el while...
        users = 0
        params = {
            "action": 'query',
            "list": 'allusers',
            "aulimit": 'max',
            "auwitheditsonly": '',
            # "auactiveusers": '',
            "auprop": ('editcount', 'groups'),
            "auexcludegroup": ('bot',),
            "rawcontinue": ''
        }
        while aufrom:
            if isinstance(aufrom, str):
                params['aufrom'] = aufrom
            data = self.query(params)
            for user in data['query']['allusers']:
                users += 1
                yield user
            aufrom = data['query-continue']['allusers']['aufrom'] if "query-continue" in data else None

    def get_user(self, user):
        active_user = Creator()
        active_user.set(user)
        if active_user.edit_count > 1000 and self._user_is_active(active_user.name):
            self.users.append(active_user)
            return active_user

    def all_users(self):
        for user in self._all_users():
            self.get_user(user)
        self.save()

    def get_user_contribs(self, user: Creator) -> int:
        self.checked_users += 1
        self.threads += 1
        loops = 0
        created_articles = []
        redirects = []
        disambigs = []
        et = relativedelta(datetime.now(), self.start_time)
        checkpoint = f"[{datetime.now():%H:%M:%S}] >>>> {user.name} *** users: {self.checked_users} " \
                     f"queries: {self.queries} threads: {self.threads} ready: {len(self.ready_users)} " \
                     f"buffer: {self.buffer_size} [{et.days}:{et.hours}:{et.minutes}:{et.seconds}]"
        print(checkpoint)
        for loops, article in enumerate(self.get_created_article_list(user.name), 1):
            was_redirect = False
            first_edit = article.first_edit
            last_edit = article.last_edit
            self.queries += 1
            oldest_edit_content = first_edit.content
            oldest_edit_tags = first_edit.tags
            was_created_by_content_translate_tool = list(filter(lambda x: x in self.ct_tags, oldest_edit_tags))

            if was_created_by_content_translate_tool:
                first_edit.is_ContentTranslate = True
                article.is_content_translate = True

            if '#REDIRECT' in oldest_edit_content:
                # La pàgina es va crear com a redirecció, no computa.
                was_redirect = True
                first_edit.is_redir = was_redirect
                redirects.append(article)
                user.redirects += 1
            elif re.search(r'\{\{\s*[Dd]esambiguació\s*(\|.*?)??\}\}', oldest_edit_content):
                # La pàgina es va crear com a desambiguació, computa?
                was_disambig = True
                first_edit.is_disambig = was_disambig
                disambigs.append(article)
                user.disambigs += 1
            else:
                created_articles.append(article)
                user.creations += 1

            article.set_templates()

            if '#REDIRECT' in last_edit.content:
                last_edit.is_redir = True
                if not was_redirect:
                    # La pàgina s'ha convertit en una redirecció
                    user.old_article_is_current_redirect += 1
            elif article.template_list:
                for template in article.template_list:
                    if template.name in article.warning_templates:
                        article.add_template(template)
                    elif template.name in article.warning2_templates:
                        article.add_countable_template(template)
                    if template.name == 'Desambiguació':
                        last_edit.is_disambig = True
        et = relativedelta(datetime.now(), self.start_time)
        content_translate_articles = user.set_content_translate_articles(created_articles)
        user.set_weak_articles(content_translate_articles)
        checkpoint = f"[{datetime.now():%H:%M:%S}] {loops:>4} {user.name} *** users: {self.checked_users} " \
                     f"queries: {self.queries} threads: {self.threads} ready: {len(self.ready_users)} " \
                     f"buffer: {self.buffer_size} [{et.days}:{et.hours}:{et.minutes}:{et.seconds}]"
        print(checkpoint)
        self.ready_users.append(user)
        self.threads -= 1
        # self.check_wait()
        return loops

    def all_contribs(self):
        print(f'[{datetime.now():%H:%M:%S}] ALL CONTRIBS METH BEGINS')
        usernames = [user.name for user in self.ready_users]
        unchecked_users = [user for user in self.users if user.name not in usernames]
        with ThreadPoolExecutor(20) as executor:
            futures = []
            for user in unchecked_users:
                futures.append(
                    executor.submit(
                        self.get_user_contribs, user
                    )
                )
            del usernames, unchecked_users
            for future in as_completed(futures):
                future.result()
        self.backup()
        self.end_time = datetime.now()

    def all_contribs_one_by_one(self):
        print(f'[{datetime.now():%H:%M:%S}] ALL CONTRIBS METH BEGINS')
        for user in self.users:
            self.get_user_contribs(user)

    def show(self):
        print("\nSHOW\nUSERS")
        for _, user in enumerate(self.users, 1):
            print(f'[{datetime.now():%H:%M:%S}] {_:>3}.- {user}')
        print("\nREADY USERS")
        for _, user in enumerate(self.ready_users, 1):
            print(f'[{datetime.now():%H:%M:%S}] {_:>3}.- {user}')

    def save(self):
        with open('resources/creators.bin', 'wb') as fp:
            pickle.dump(self.users, fp, pickle.HIGHEST_PROTOCOL)

    def load(self):
        with open('resources/creators.bin', 'rb') as fp:
            self.users = pickle.load(fp)

        ready_users_backup_file = Path('resources/checked_creators.bin')
        if ready_users_backup_file.exists():
            with open('resources/checked_creators.bin', 'rb') as fp:
                self.ready_users = pickle.load(fp)

    def backup(self):
        with open('resources/checked_creators.bin', 'wb') as fp:
            pickle.dump(self.ready_users, fp, pickle.HIGHEST_PROTOCOL)

    def remove_checked_users(self):
        usernames = [user.name for user in self.ready_users]
        self.users = [user for user in self.users if user.name not in usernames]
        self.save()

    def run(self):
        if not self.users:
            self.all_users()
        self.all_contribs()
        self.save()
        self.show()
        print(f'[{datetime.now()}] FINISH! queries: {self.queries} users: {len(self.users)}')
        et = relativedelta(self.end_time, self.start_time)
        print(
            f'started at: {self.start_time:%H:%M:%S} elapsed_time: {et.days}d {et.hours}h {et.minutes}m {et.seconds}s'
        )

    def resume(self):
        self.load()
        self.remove_checked_users()
        self.run()


def article_test():
    article = Article('Betacisme')
    print(article.first_edit.id, article.first_edit.username, article.first_edit.timestamp)
    print(article.last_edit.id, article.last_edit.username, article.last_edit.timestamp)


if __name__ == '__main__':
    arg_parser = ArgumentParser()
    arg_parser.add_argument('-L', '--load', dest='load', action='store_true')
    arg_parser.add_argument('-S', '--show', dest='show', action='store_true')
    arg_parser.add_argument('-R', '--resume', dest='resume', action='store_true')
    args = arg_parser.parse_args()

    ac = ArticleCreators()
    if args.load:
        ac.load()
    if args.show:
        ac.show()
    elif args.resume:
        ac.resume()
    else:
        ac.run()