Usuari:CobainBot/scripts/cawiki/editcounter.py
Aparença
"""
(!C) 2023 Coet.
Copyleft: Feu el que vos convinga en sóc l'autor però decline tota responsabilitat i no m'importa el que feu amb
la integralitat o alguna part del codi. Si el conjunt o part d'eines, classes i mètodes, vos resulten útils i
podeu tornar a gastar-los no us perseguiré demanant-ne res i no m'importa si heu de canviar la llicència o autoria :)
L'editcounter pretén mostrar el nombre de cada usuari en una llista dels primers 500 amb més edicions. És fet en un sol
mòdul, tot i que té dependències d'altres mòduls de l'estàndar.
L'editcounter substitueix altres versions anteriors i incorpora noves funcionalitats com és la d'indicar l'increment
diari basat en les dades de l'anterior execució, que en un principi és del dia anterior. Compta amb uns mètodes que
permeten iniciar el comptador amb les dades que hi ha penjades a la pàgina si no es disposara de cap fitxer on s'hagen
emmagatzemat les dades de l'execució anterior.
En una primera execució, sent que ja existeixen dades a les pàgines de la Viquipèdia caldrà utilitzar el mètode .start()
per a poder comptar amb dades de l'execució anterior.
Tenint un fitxer amb dades de la darrera execució és recomanable utilitzar el mètode .run().
No s'ha implementat un mètode en cas de no tindre unes dades inicials.
"""
import json
import locale
import pickle
import re
from datetime import datetime
from pathlib import Path
from typing import Dict, List, NoReturn, Optional, Union, Tuple
from dateutil.relativedelta import relativedelta
from pywikibot import Page, Site
from pywikibot.data import api
class User:
"""
La classe User proveïx un objecte amb els cinc atributs que ens caldran per a omplir
la taula d'usuaris amb les seues dades.
"""
def __init__(self):
self.name: str = ''
self.previous_edits: int = 0
self.current_edits: int = 0
self.last_timestamp: Optional[datetime] = None
self.group: str = ''
@property
def diff(self):
return self.current_edits - self.previous_edits
def set(self, name: str, cur_ec: int, groups: List[str]):
"""
Mètode simplista, es recomana utilitzar .set_by_dict amb dades que ens
proporcionarà l'API.
:param name: str, nom d'usuari.
:param cur_ec: int, nombre d'edicions actual.
:param groups: str list, groups al que pertany l'usuari.
:return:
"""
self.name = name
self.current_edits = cur_ec
groups = list(filter(lambda g: g in EditCounter.special_groups, groups))
if groups:
self.group = groups[0]
def set_by_dict(self, usr_dict):
"""
Passem un diccionari amb les claus 'name', 'editcount', groups'. L'API ens proporciona
eixe tipus de dades o es pot emular si ens convé.
:param usr_dict:
:return:
"""
self.set(usr_dict['name'], usr_dict['editcount'], usr_dict['groups'])
def set_last_ec(self, last_ec: int):
self.previous_edits = last_ec
def set_last_ts(self, last_ts):
self.last_timestamp = datetime.strptime(last_ts, '%Y-%m-%dT%H:%M:%SZ')
def reset_edits(self):
"""
Mètode per a invertir les dades de les quals disposem. Traspassem les dades dels atributs
de current_edits a previous_edits i deixem current_edits a 0 per a obtenir el valor
actual en un mètode posterior.
Tornem el propi objecte en cas d'iteració.
"""
self.current_edits, self.previous_edits = 0, self.current_edits
return self
def __repr__(self):
group = self.group
if group:
group = f' {group}'
if not self.last_timestamp:
return f'<User {self.name} {self.current_edits} {self.previous_edits} (dif: {self.diff}){group}>'
return f'<User {self.name} {self.current_edits} {self.previous_edits} (dif: {self.diff}){group} ' \
f'{self.last_timestamp:%Y-%m-%d %H:%M:%S}>'
def to_dict(self):
"""
Passem els atributs de la classe a diccionari amb la finalitat de poder emmagatzemar-los
en un fitxer .json, ja que Python pot serialitzar fàcilment les dades d'un diccionari a
objecte json.
:return:
"""
return {
'name': self.name,
'current_edits': self.current_edits,
'previous_edits': self.previous_edits,
'last_timestamp': f'{self.last_timestamp:%Y-%m-%dT%H:%M:%SZ}' if self.last_timestamp else '',
'group': self.group
}
def to_json(self):
return json.dumps(self.to_dict())
def from_json(self, json_data):
"""
Obtenim les dades emmagatzemades en format json per a donar els valors als atributs de
la classe. Tornem l'objecte en cas d'iteració.
:param json_data: objecte json amb els atributs homònims de la classe.
:return: self.
"""
self.name = json_data['name']
self.current_edits = json_data['current_edits']
self.previous_edits = json_data['previous_edits']
self.group = json_data['group']
if json_data['last_timestamp']:
self.set_last_ts(json_data['last_timestamp'])
return self
class EditCounter:
"""
Classe principal.
A l'atribut data li passem tots els usuaris amb un mínim d'edicions, ara per ara 1700, tant si son bots com
admins o usuaris que no pertanyen a estos dos grups. De manera que tenim totes les dades en una sola variable.
Quan anem a editar la pàgina excloent els bots, només hem de descartar estos ja que la classe User es queda amb
el grup al qual pertany.
Es pot evitar l'edició (modificació de la pàgina) o el desament de dades passant edit=False i save=False
respectivament.
"""
special_groups = ('sysop', 'bot') # Són els grups que ens interessen
def __init__(self, edit=True, save=True):
locale.setlocale(locale.LC_TIME, 'Catalan_Andorra.utf-8')
self.data: List[User] = []
self.init_time: datetime = datetime.now()
self.site: Site = Site('ca', 'wikipedia', 'CobainBot')
self.title: str = "Viquipèdia:Llista de viquipedistes per nombre d'edicions"
self.page: Page = Page(self.site, self.title)
self.subpages: Tuple[str, ...] = ('exclude', 'unflaggedbots')
self.excluded_users: List[str] = []
self.excluded_bots: List[str] = []
self.previous_data: Dict[str, Dict[str, Union[bool, int]]] = {}
self.min_edits: int = 1700 # Ha de ser inferior a les edicions de l'últim usuari/ària.
self.capacity = 500 # Nombre d'usuaris a llistar.
self.edit_counter_dict: Dict[int, List[User]] = {}
self.last_running: Optional[datetime] = None
self.summary: str = 'Bot: actualització'
self.inactive_delay = 90
self.requests: int = 0
self.edit_bool = edit
self.save_bool = save
self.cached_allusers = False
# region API Queries
def query(self, params) -> dict:
qry = api.Request(self.site, parameters=params)
self.requests += 1
return qry.submit()
def get_page_content(self, title: str = None, revisions: int = 1) -> str:
"""
Obtenim el contingut d'una pàgina agafant la darrera revisió sempre que revisions siga 1, que n'és el valor per
defecte. Si és major, sempre s'agafa la darrera revisió obtinguda de la llista de revisions.
"""
if not title:
title = self.title
params = {
'action': 'query',
'prop': 'revisions',
'titles': title,
'rvslots': '*',
'rvprop': 'content|timestamp|comment|user',
'rvlimit': revisions,
'rvdir': 'older',
'indexpageids': ''
}
data = self.query(params)
pageid = data['query']['pageids'][0]
timestamp = data['query']['pages'][pageid]['revisions'][revisions - 1]['timestamp']
if not self.last_running:
self.last_running = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
return data['query']['pages'][pageid]['revisions'][revisions - 1]['slots']['main']['*']
def get_last_update(self, title: str = None, revisions: int = 5, summary='') -> str:
"""
Obtenim el contingut d'una pàgina agafant la darrera revisió que siga del bot. En principi descarregant només
les darreres 5 revisions, si no s'obté cap revisió es tornarà el contingut com a cadena buida. PER FER: En eixe
cas caldria resoldre com es gestiona esta mancança de dades.
"""
if not title:
title = self.title
if not summary:
summary = self.summary
params = {
'action': 'query',
'prop': 'revisions',
'titles': title,
'rvslots': '*',
'rvprop': ('comment', 'content', 'timestamp', 'user'),
'rvlimit': revisions,
'rvdir': 'older',
'indexpageids': ''
}
data = self.query(params)
pageid = data['query']['pageids'][0]
revisions = data['query']['pages'][pageid]['revisions']
content = ''
for revision in revisions:
timestamp = revision['timestamp']
user = revision['user']
comment = revision['comment']
# El nom del propi bot és site.username() o simplement el que ens passa l'API :)
if user == data['query']['userinfo']['name'] and comment == summary:
content = revision['slots']['main']['*']
if not self.last_running:
self.last_running = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
break
return content
def get_last_timestamp(self, username: str) -> str:
"""
Agafem la data i l'hora de la darrera edició de l'usuari.
:param username: str.
:return: str.
"""
params = {
'action': 'query',
'list': 'usercontribs',
'ucuser': username,
'ucprop': 'timestamp',
'uclimit': 1
}
data = self.query(params)
return data['query']['usercontribs'][0]['timestamp']
def get_user_contribs(self, username: str, start: datetime, end: datetime) -> Dict[str, Union[str, int]]:
"""
Agafem les contribucions de les darreres 24 hores de l'usuari.
El mètode està pensat per si apareix algun nou usuari al final de la llista en cas que no tinguem les
seues dades, això només ocorre en cas que hem agafat les dades de l'execució anterior de la pàgina i no
del fitxer de dades emmagatzemades.
Es diu que són de les darreres 24 hores però podria ser d'un periode més gran mentre ho permeten les
dades proporcionades per l'API, el mètode no és recursiu.
"""
params = {
'action': 'query',
'list': 'usercontribs',
'ucstart': f'{start:%Y-%m-%dT%H:%M:%SZ}',
'ucend': f'{end:%Y-%m-%dT%H:%M:%SZ}',
'ucuser': username,
'ucprop': 'timestamp',
'uclimit': 'max'
}
data = self.query(params)
contribs = len(data['query']['usercontribs'])
timestamp = data['query']['usercontribs'][0]['timestamp'] if contribs else self.get_last_timestamp(username)
return {'edits': contribs, 'last': timestamp}
def _all_users(self) -> NoReturn:
"""
Mètode recursiu per poder obtenir tots els usuaris i usuàries locals de la VP. Amb recursiu, s'entén que
tècnicament fa referència a iteratiu amb l'afegit que mentre hi queden elements el mètode no s'atura. És
a dir, utilitze el terme en un sentit més col·loquial que tècnic, ja que recursiu voldria dir que el mètode
es torna a cridar a ell mateix metre queden elements.
"""
aufrom = True # Per superar el while...
users = 0
params = {
"action": 'query',
"list": 'allusers',
"aulimit": 'max',
"auwitheditsonly": '',
"auprop": ('editcount', 'groups'),
"rawcontinue": ''
}
while aufrom:
if isinstance(aufrom, str):
params['aufrom'] = aufrom
data = self.query(params)
for user in data['query']['allusers']:
users += 1
yield user
print(f"{datetime.now():%H:%M:%S} {users}")
aufrom = data['query-continue']['allusers']['aufrom'] if "query-continue" in data else None
def get_userinfo(self, username):
"""
Mètode per obtenir les dades d'un usuari concret. No es recomana el seu ús si s'ha de fer
per a multiples comptes, caldria un mètode on username fóra la llista d'usuaris de manera que
en una sola consulta els poguerem obtenir.
:param username:
:return:
"""
params = {
'action': 'query',
'list': 'users',
'usprop': ('editcount', 'groups'),
'ususers': username
}
data = self.query(params)
return data['query']['users'][0]
# endregion
# region Requests
def get_all_users(self) -> NoReturn:
"""
No tenim dades emmagatzemades així que tots els usuaris són nous.
Ací els usuaris exclosos els ajuntem amb els bots
"""
if self.cached_allusers:
with open('resources/allusers.bin', 'rb') as fp:
self.edit_counter_dict = pickle.load(fp)
return
excluded_users = []
excluded_users.extend(self.excluded_users)
excluded_users.extend(self.excluded_bots)
users_by_num_of_edits = {}
for usr_dict in self._all_users():
if usr_dict['name'] in excluded_users:
continue
if usr_dict['editcount'] < self.min_edits:
continue
user = User()
user.set_by_dict(usr_dict)
if user.current_edits not in users_by_num_of_edits:
users_by_num_of_edits[user.current_edits] = []
users_by_num_of_edits[user.current_edits].append(user)
self.edit_counter_dict = users_by_num_of_edits
with open('resources/allusers.bin', 'wb') as fp:
pickle.dump(users_by_num_of_edits, fp, pickle.HIGHEST_PROTOCOL)
def get_user(self, usr_dict: dict) -> User:
"""
Obtenim l'usuari de les dades emmagatzemades, en principi l'usuari ha d'existir.
:param usr_dict: dict, diccionari amb dades d'usuari.
:return: user: User.
"""
for user in self.data:
if user.name == usr_dict['name']:
return user
# L'usuari no existeix
user = User()
user.set_by_dict(usr_dict)
return user
def get_all_users_but_update(self):
"""
Com get_all_users però utilitzat quan disposem de dades emmagatzemades.
:return:
"""
if self.cached_allusers:
with open('resources/allusers.bin', 'rb') as fp:
self.edit_counter_dict = pickle.load(fp)
return
excluded_users = []
excluded_users.extend(self.excluded_users)
excluded_users.extend(self.excluded_bots)
users_by_num_of_edits = {}
for usr_dict in self._all_users():
if usr_dict['name'] in excluded_users:
continue
if usr_dict['editcount'] < self.min_edits:
continue
user = self.get_user(usr_dict)
user.current_edits = usr_dict['editcount']
if user.current_edits not in users_by_num_of_edits:
users_by_num_of_edits[user.current_edits] = []
users_by_num_of_edits[user.current_edits].append(user)
self.edit_counter_dict = users_by_num_of_edits
with open('resources/allusers.bin', 'wb') as fp:
pickle.dump(users_by_num_of_edits, fp, pickle.HIGHEST_PROTOCOL)
def get_excluded_users(self) -> NoReturn:
"""
Agafem usuaris que hi ha a les subpàgines.
"""
pattern = re.compile(r'\[\[Us(?:uari|ària|er):(?P<username>[^\]]+)\]\]')
for i, subpage in enumerate(self.subpages):
title = f'{self.title}/{subpage}'
content = self.get_page_content(title)
if i == 1:
self.excluded_users = pattern.findall(content)
elif i == 2:
self.excluded_bots = pattern.findall(content)
def get_data_from_content(self, content: str) -> Dict[str, Dict[str, Union[bool, int]]]:
match = re.search(r'\{\{/begin\|\d+\|(?P<lastrunning>[^|]+)\|(?P<previousrunning>[^}]+)\}\}', content)
if match:
timestamp = match.group('lastrunning')
self.last_running = datetime.strptime(timestamp, '%H:%M, %d %b %Y')
pattern = re.compile(
r'\[\[User:(?P<username>[^|]+)\|[^\]]+\]\](?: \((?P<role>admin|bot)\))? \|\| '
r'\[\[Special:Contributions/[^|]+\|(?P<edits>\d+)'
)
return {
it.group('username'): {
'admin': it.group('role') == 'admin',
'bot': it.group('role') == 'bot',
'edits': int(it.group('edits'))
}
for it in pattern.finditer(content)
}
# endregion
# region Previous data from wikipages
def get_previous_data(self) -> NoReturn:
"""
Agafem les dades de la darrera execució.
"""
content = self.get_last_update()
self.previous_data = self.get_data_from_content(content)
title = f'{self.title} (bots inclosos)'
content = self.get_last_update(title)
data = self.get_data_from_content(content)
self.previous_data.update(data)
def get_last_24h_edits(self, username: str, cur_eds: int) -> int:
"""
Este mètode ens servix en cas que un usuari supera l'últim de la llista.
:param username: str, nom de l'usuari.
:param cur_eds: int, nombre d'edicions actuals.
:return: int, nombre d'edicions.
"""
start = self.init_time
end = start - relativedelta(days=1)
data = self.get_user_contribs(username, start, end)
return cur_eds - data['edits']
def set_previous_edits(self) -> NoReturn:
"""
Mètode que ordena els usuaris de major a menor segons el nombre d'edicions.
"""
for num_of_edits, users in sorted(self.edit_counter_dict.items(), reverse=True):
for user in users:
if user.current_edits >= self.min_edits:
prev_data = self.previous_data.get(user.name, -1)
if prev_data == -1:
prev_data = {'edits': self.get_last_24h_edits(user.name, user.current_edits)}
user.previous_edits = prev_data['edits']
user.set_last_ts(self.get_last_timestamp(user.name))
self.data.append(user)
# endregion
# region Edit results
def set_contents(self) -> Dict[str, List[str]]:
timestamps = {
'it': f'{self.init_time:%H:%M, %#d %b %Y}',
'lr': f'{self.last_running:%H:%M, %#d %b %Y}',
'c': self.capacity
}
content = ["{{/begin|%(c)i|%(it)s|%(lr)s}}" % timestamps]
content_without_bots = ["{{/begin|%(c)i|%(it)s|%(lr)s}}" % timestamps]
i = 0
j = k = 1
groups = {'sysop': 'admin', 'bot': 'bot'}
for user in self.data:
group_str = ''
if user.group:
group_str = f' ({groups[user.group]})'
td = relativedelta(self.init_time, user.last_timestamp.replace(tzinfo=None))
fmt_usr = f'<span style="color:gray">{user.name}</span>' if \
(td.years * 365) + (td.months * 30) + td.days >= self.inactive_delay else user.name
growth = user.diff
growth = f"+{growth}" if growth > 0 else \
f'<span style="color:gray">{growth}</span>' if growth == 0 else \
f'<span style="color:red">{growth}</span>'
if j <= self.capacity:
content.append(
f"|-\n| {j} || [[User:{user.name}|{fmt_usr}]]{group_str} || "
f"[[Special:Contributions/{user.name}|{user.current_edits}]] || "
f"{user.last_timestamp:%Y-%m-%d %H:%M} || {growth}"
)
j += 1
if user.group == 'bot':
continue
if k <= self.capacity:
content_without_bots.append(
f"|-\n| {k} || [[User:{user.name}|{fmt_usr}]]{group_str} || "
f"[[Special:Contributions/{user.name}|{user.current_edits}]] || "
f"{user.last_timestamp:%Y-%m-%d %H:%M} || {growth}"
)
k += 1
i += 1
if j > self.capacity and k > self.capacity:
break
return {'bots': content, 'people': content_without_bots}
def edit_headers(self):
for title in (self.title, f'{self.title} (bots inclosos)'):
page = Page(self.site, f'{title}/begin')
content = page.get()
inc = f'!title="darrera execució: {self.last_running:%H:%M, %#d %b %Y}"| increment'
content = re.sub(r'!title=".*"[^|]+\| increment', inc, content)
page.put(content, 'Bot: inserció dades darrera execució')
def edit(self):
contents = self.set_contents()
people = '\n'.join(contents['people'])
if self.edit_bool:
self.page.put(people, self.summary)
else:
print(people)
bots = '\n'.join(contents['bots'])
page = Page(self.site, f'{self.title} (bots inclosos)')
if self.edit_bool:
page.put(bots, self.summary)
# endregion
# region Data handling.
def sort(self) -> NoReturn:
"""
Quan ja tenim les dades ordernades de major a menor depenent del nombre d'edicions i també comptem amb la data
de la darrera edició, fem que qui tinga el mateix nombre d'edicions aparega primer si ho va aconseguir abans.
Ordenem per nombre d'edicions (descendent) i en segon lloc per data (ascendent).
"""
self.data.sort(
key=lambda usr: (usr.current_edits, -usr.last_timestamp.timestamp()), reverse=True
)
def save(self):
if not self.save_bool:
return
data = [user.to_dict() for user in self.data]
with open(f'resources/editcounter_{self.init_time:%Y-%m-%d_%H-%M-%S}.json', 'w') as fp:
json.dump(data, fp)
def load(self):
# Agafem el fitxer més recent d'editcounter.
newest_filepath = max(Path().glob('resources/editcounter_*.json'), key=lambda x: x.stat().st_ctime)
self.last_running = datetime.strptime(newest_filepath.stem.replace('editcounter_', ''), '%Y-%m-%d_%H-%M-%S')
with open(newest_filepath, 'r') as fp:
data = json.load(fp)
if data:
self.data = [User().from_json(user) for user in data]
def store(self):
"""
Mètode que ens permet emmagatzemar les dades que hi ha penjades a les pàgines.
Ideal si no disposem d'un fitxer editcounter_*.json.
"""
content = self.get_page_content()
data = self.get_data_from_content(content)
title = f'{self.title} (bots inclosos)'
content = self.get_page_content(title)
data.update(self.get_data_from_content(content))
for username, props in data.items():
user = User()
usr_dict = {
'name': username,
'groups': ['sysop' if props['admin'] else 'bot' if props['bot'] else ''],
'editcount': props['edits']
}
user.set_by_dict(usr_dict)
self.data.append(user)
self.save()
# endregion
# region Initializers
def start(self) -> NoReturn:
"""
Mètode que emprarem quan no disposem d'un fitxer amb dades.
Ideal per a una primera execució, obtenim les dades prèvies de la pàgina que ja s'ha editat. També
en cas de perdre o haver eliminat els fitxers emmagatzemats.
Si tenim fitxers a la carpeta resources amb el nom editcounter_*.json, podrem utilitzar el mètode
run().
"""
self.get_excluded_users()
self.get_previous_data()
self.get_all_users()
self.set_previous_edits()
self.sort()
self.edit()
def reset_edits(self) -> NoReturn:
"""
Mètode per invertir els valors entre current_edits i previous_edits, i deixar currents_edits a 0
per a després donar els nous valors amb el mètode get_all_users_but_update().
"""
self.data = [user.reset_edits() for user in self.data]
def set_new_data(self):
"""
Mètode per ordernar les dades dels usuaris per nombre d'edicions de major a menor. Semblant a set_previous_edits
quan no disposem de dades emmagatzemades.
:return:
"""
self.data = []
for num_of_edits, users in sorted(self.edit_counter_dict.items(), reverse=True):
for user in users:
if user.current_edits >= self.min_edits:
user.set_last_ts(self.get_last_timestamp(user.name))
self.data.append(user)
def run(self) -> NoReturn:
"""
Mètode principal, l'emprarem quan ja disposem de fitxer de les dades emmagatzemades a la
carpeta 'resources'.
"""
self.get_excluded_users()
self.load()
self.reset_edits()
self.get_all_users_but_update()
self.set_new_data()
self.sort()
self.save()
self.edit()
rd = relativedelta(datetime.now(), self.init_time)
elapsed = f'{rd.hours}:{rd.minutes}:{rd.seconds}.{rd.microseconds}'
print('last running', self.last_running)
print(f'[{datetime.now():%H:%M:%S}] requests: {self.requests} users: {len(self.data)} elapsed time: {elapsed}')
# endregion
# region Tests
def test(self) -> NoReturn:
self.get_previous_data()
for username, _ in self.previous_data.items():
edits = editcounter.get_last_24h_edits(username, 0)
last_ts = self.get_last_timestamp(username)
print(username, edits*-1, last_ts)
def test_user(self) -> NoReturn:
username = 'Coet'
user = User()
usr_dict = self.get_userinfo(username)
user.set_by_dict(usr_dict)
user.set_last_ts(self.get_last_timestamp(username))
print(user)
def test_all_users(self):
for user in self._all_users():
print(user)
break
def test_load(self):
self.load()
print(len(self.data), self.data)
# endregion
if __name__ == '__main__':
editcounter = EditCounter(edit=False, save=False)
editcounter.run()