Skip to content
Snippets Groups Projects
page.py 15.3 KiB
Newer Older
import client
import errors
import listing
import compatibility
from page_nowriteapi import OldPage

import urllib
import urlparse
import time

class Page(object):
    def __init__(self, site, name, info=None, extra_properties={}):
        if type(name) is type(self):
            return self.__dict__.update(name.__dict__)
        self.site = site
        self.name = name
        self.section = None
        if not info:
            if extra_properties:
                prop = 'info|' + '|'.join(extra_properties.iterkeys())
                extra_props = []
                [extra_props.extend(extra_prop) for extra_prop in extra_properties.itervalues()]
            else:
                prop = 'info'
                extra_props = ()
            if type(name) is int:
                info = self.site.api('query', prop=prop, pageids=name,
                                     inprop='protection', *extra_props)
                info = self.site.api('query', prop=prop, titles=name,
                                     inprop='protection', *extra_props)
            info = info['query']['pages'].itervalues().next()
        self._info = info
        self.namespace = info.get('ns', 0)
        self.name = info.get('title', u'')
        if self.namespace:
            self.page_title = self.strip_namespace(self.name)
        else:
            self.page_title = self.name
        self.touched = client.parse_timestamp(info.get('touched', '0000-00-00T00:00:00Z'))
        self.revision = info.get('lastrevid', 0)
        self.exists = 'missing' not in info
        self.length = info.get('length')
        self.protection = dict([(i['type'], (i['level'], i['expiry'])) for i in info.get('protection', ()) if i])
        self.redirect = 'redirect' in info
        self.last_rev_time = None
        self.edit_time = None
    def redirects_to(self):
        """ Returns the redirect target page, or None if the page is not a redirect page."""
        info = self.site.api('query', prop='pageprops', titles=self.name, redirects='')['query']
        if 'redirects' in info:
            for page in info['redirects']:
                if page['from'] == self.name:
                    return Page(self.site, page['to'])
            return None
        else:
            return None
    def resolve_redirect(self):
        """ Returns the redirect target page, or the current page if it's not a redirect page."""
        target_page = self.redirects_to()
            return self
        else:
            return target_page
    def __repr__(self):
        return "<Page object '%s' for %s>" % (self.name.encode('utf-8'), self.site)
    def __unicode__(self):
        return self.name
    @staticmethod
    def strip_namespace(title):
        if title[0] == ':':
            title = title[1:]
        return title[title.find(':') + 1:]
    @staticmethod
    def normalize_title(title):
        # TODO: Make site dependent
        title = title.strip()
        if title[0] == ':':
            title = title[1:]
        title = title[0].upper() + title[1:]
        title = title.replace(' ', '_')
        return title
    def can(self, action):
        level = self.protection.get(action, (action, ))[0]
        if level == 'sysop':
            level = compatibility.protectright(self.site.version)
        return level in self.site.rights
    def get_token(self, type, force=False):
        self.site.require(1, 11)
        if type not in self.site.tokens:
            self.site.tokens[type] = '0'
        if self.site.tokens.get(type, '0') == '0' or force:
            info = self.site.api('query', titles=self.name,
                                 prop='info', intoken=type)
            for i in info['query']['pages'].itervalues():
                if i['title'] == self.name:
                    self.site.tokens[type] = i['%stoken' % type]
        return self.site.tokens[type]
    def get_expanded(self):
        self.site.require(1, 12)
        revs = self.revisions(prop='content', limit=1, expandtemplates=True)
        try:
            return revs.next()['*']
        except StopIteration:
            return u''
    def edit(self, section=None, readonly=False):
        """Returns wikitext for a specified section or for the whole page.
        Retrieves the latest edit.
        """
        if not self.can('read'):
            raise errors.InsufficientPermission(self)
        if not self.exists:
            return u''
        revs = self.revisions(prop='content|timestamp', limit=1, section=section)
        try:
            rev = revs.next()
            self.text = rev['*']
            self.section = section
            self.last_rev_time = rev['timestamp']
        except StopIteration:
            self.text = u''
            self.section = None
            self.edit_time = None
        self.edit_time = time.gmtime()
        return self.text
    def save(self, text=u'', summary=u'', minor=False, bot=True, section=None, **kwargs):
        """Save text of page."""
        if not self.site.logged_in and self.site.force_login:
            # Should we really check for this?
            raise errors.LoginError(self.site)
        if self.site.blocked:
            raise errors.UserBlocked(self.site.blocked)
        if not self.can('edit'):
            raise errors.ProtectedPageError(self)
        if not text:
            text = self.text
        if not section:
            section = self.section
        if not self.site.writeapi:
            return OldPage.save(self, text=text, summary=summary, minor=False)
        if minor:
            data['minor'] = '1'
        if not minor:
            data['notminor'] = '1'
        if self.last_rev_time:
            data['basetimestamp'] = time.strftime('%Y%m%d%H%M%S', self.last_rev_time)
        if self.edit_time:
            data['starttimestamp'] = time.strftime('%Y%m%d%H%M%S', self.edit_time)
        if bot:
            data['bot'] = '1'
        if section:
            data['section'] = section
            result = self.site.api('edit', title=self.name, text=text,
                                   summary=summary, token=self.get_token('edit'),
                                   **data)
            if result['edit'].get('result').lower() == 'failure':
                raise errors.EditError(self, result['edit'])
        try:
            result = do_edit()
        except errors.APIError, e:
            if e.code == 'badtoken':
                # Retry, but only once to avoid an infinite loop
                try:
                    result = do_edit()
                except errors.APIError, e:
                    self.handle_edit_error(e, summary)
            else:
                self.handle_edit_error(e, summary)
        # 'newtimestamp' is not included if no change was made
        if 'newtimestamp' in result['edit'].keys():
            self.last_rev_time = client.parse_timestamp(result['edit'].get('newtimestamp'))
        return result['edit']
    def handle_edit_error(self, e, summary):
        if e.code == 'editconflict':
            raise errors.EditError(self, summary, e.info)
        elif e.code in ('protectedtitle', 'cantcreate', 'cantcreate-anon', 'noimageredirect-anon',
                        'noimageredirect', 'noedit-anon', 'noedit'):
            raise errors.ProtectedPageError(self, e.code, e.info)
        else:
    def get_expanded(self):
        self.site.require(1, 12)
        revs = self.revisions(prop='content', limit=1, expandtemplates=True)
        try:
            return revs.next()['*']
        except StopIteration:
            return u''
    def move(self, new_title, reason='', move_talk=True, no_redirect=False):
        """Move (rename) page to new_title.
        If user account is an administrator, specify no_direct as True to not
        leave a redirect.
        If user does not have permission to move page, an InsufficientPermission
        exception is raised.
        if not self.can('move'):
            raise errors.InsufficientPermission(self)
        if not self.site.writeapi:
            return OldPage.move(self, new_title=new_title,
                                reason=reason, move_talk=move_talk)
        if move_talk:
            data['movetalk'] = '1'
        if no_redirect:
            data['noredirect'] = '1'
        result = self.site.api('move', ('from', self.name), to=new_title,
                               token=self.get_token('move'), reason=reason, **data)
        return result['move']
    def delete(self, reason='', watch=False, unwatch=False, oldimage=False):
        If user does not have permission to delete page, an InsufficientPermission
        exception is raised.
        if not self.can('delete'):
            raise errors.InsufficientPermission(self)
        if not self.site.writeapi:
            return OldPage.delete(self, reason=reason)
        if watch:
            data['watch'] = '1'
        if unwatch:
            data['unwatch'] = '1'
        if oldimage:
            data['oldimage'] = oldimage
        result = self.site.api('delete', title=self.name,
                               token=self.get_token('delete'),
                               reason=reason, **data)
        return result['delete']
    def purge(self):
        """Purge server-side cache of page. This will re-render templates and other
        dynamic content.
        self.site.raw_index('purge', title=self.name)
    # def watch: requires 1.14
    def backlinks(self, namespace=None, filterredir='all', redirect=False, limit=None, generator=True):
        self.site.require(1, 9)
        # Fix title for < 1.11 !!
        prefix = listing.List.get_prefix('bl', generator)
        kwargs = dict(listing.List.generate_kwargs(prefix,
                                                   namespace=namespace, filterredir=filterredir))
        if redirect:
            kwargs['%sredirect' % prefix] = '1'
        kwargs[compatibility.title(prefix, self.site.require(1, 11, raise_error=False))] = self.name
        return listing.List.get_list(generator)(self.site, 'backlinks', 'bl', limit=limit, return_values='title', **kwargs)
    def categories(self, generator=True):
        self.site.require(1, 11)
        if generator:
            return listing.PagePropertyGenerator(self, 'categories', 'cl')
        else:
            # TODO: return sortkey if wanted
            return listing.PageProperty(self, 'categories', 'cl', return_values='title')
    def embeddedin(self, namespace=None, filterredir='all', redirect=False, limit=None, generator=True):
        self.site.require(1, 9)
        # Fix title for < 1.11 !!
        prefix = listing.List.get_prefix('ei', generator)
        kwargs = dict(listing.List.generate_kwargs(prefix,
                                                   namespace=namespace, filterredir=filterredir))
        if redirect:
            kwargs['%sredirect' % prefix] = '1'
        kwargs[compatibility.title(prefix, self.site.require(1, 11, raise_error=False))] = self.name
        return listing.List.get_list(generator)(self.site, 'embeddedin', 'ei', limit=limit, return_values='title', **kwargs)
    def extlinks(self):
        self.site.require(1, 11)
        return listing.PageProperty(self, 'extlinks', 'el', return_values='*')
        self.site.require(1, 9)
        if generator:
            return listing.PagePropertyGenerator(self, 'images', '')
        else:
            return listing.PageProperty(self, 'images', '', return_values='title')
    def iwlinks(self):
        self.site.require(1, 9)  # guessing...
        return listing.PageProperty(self, 'iwlinks', 'iw', return_values=('prefix', '*'))

    def langlinks(self, **kwargs):
        self.site.require(1, 9)
        return listing.PageProperty(self, 'langlinks', 'll', return_values=('lang', '*'), **kwargs)
    def links(self, namespace=None, generator=True, redirects=False):
        self.site.require(1, 9)
        kwargs = dict(listing.List.generate_kwargs('pl', namespace=namespace))
        if redirects:
            kwargs['redirects'] = '1'
        if generator:
            return listing.PagePropertyGenerator(self, 'links', 'pl', **kwargs)
        else:
            return listing.PageProperty(self, 'links', 'pl', return_values='title', **kwargs)
    def revisions(self, startid=None, endid=None, start=None, end=None,
                  dir='older', user=None, excludeuser=None, limit=50,
                  prop='ids|timestamp|flags|comment|user', expandtemplates=False, section=None):
        self.site.require(1, 8)
        kwargs = dict(listing.List.generate_kwargs('rv', startid=startid, endid=endid,
                                                   start=start, end=end, user=user, excludeuser=excludeuser))
        kwargs['rvdir'] = dir
        kwargs['rvprop'] = prop
        if expandtemplates:
            kwargs['rvexpandtemplates'] = '1'
        if section:
            kwargs['rvsection'] = section
        return listing.RevisionsIterator(self, 'revisions', 'rv', limit=limit, **kwargs)
    def templates(self, namespace=None, generator=True):
        self.site.require(1, 8)
        kwargs = dict(listing.List.generate_kwargs('tl', namespace=namespace))
        if generator:
            return listing.PagePropertyGenerator(self, 'templates', 'tl')
        else:
            return listing.PageProperty(self, 'templates', 'tl', return_values='title')
class Image(Page):
    def __init__(self, site, name, info=None):
        site.require(1, 11)
        Page.__init__(self, site, name, info,
                      extra_properties={'imageinfo': (('iiprop',
                                                       compatibility.iiprop(site.version)), )})
        self.imagerepository = self._info.get('imagerepository', '')
        self.imageinfo = self._info.get('imageinfo', ({}, ))[0]
    def imagehistory(self):
        return listing.PageProperty(self, 'imageinfo', 'ii',
                                    iiprop=compatibility.iiprop(self.site.version))
    def imageusage(self, namespace=None, filterredir='all', redirect=False,
                   limit=None, generator=True):
        self.site.require(1, 11)
        # TODO: Fix for versions < 1.11
        prefix = listing.List.get_prefix('iu', generator)
        kwargs = dict(listing.List.generate_kwargs(prefix, title=self.name,
                                                   namespace=namespace, filterredir=filterredir))
        if redirect:
            kwargs['%sredirect' % prefix] = '1'
        return listing.List.get_list(generator)(self.site, 'imageusage', 'iu',
                                                limit=limit, return_values='title', **kwargs)
    def duplicatefiles(self, limit=None):
        self.require(1, 14)
        return listing.PageProperty(self, 'duplicatefiles', 'df',
                                    dflimit=limit)
    def download(self):
        url = self.imageinfo['url']
        if not url.startswith('http://'):
            url = 'http://' + self.site.host + url
        url = urlparse.urlparse(url)
        # TODO: query string
        return self.site.connection.get(url[1], url[2])
    def __repr__(self):
        return "<Image object '%s' for %s>" % (self.name.encode('utf-8'), self.site)