diff options
| author | 2012-12-12 19:58:42 -0300 | |
|---|---|---|
| committer | 2012-12-12 19:58:42 -0300 | |
| commit | ca1f5a392261a7c6b82b5ac1015427605909d8c9 (patch) | |
| tree | f91146c9340c6c78e84aaf6b92053386397e2069 /nikola/utils.py | |
Imported Upstream version 4.0.3upstream/4.0.3
Diffstat (limited to 'nikola/utils.py')
| -rw-r--r-- | nikola/utils.py | 466 |
1 files changed, 466 insertions, 0 deletions
diff --git a/nikola/utils.py b/nikola/utils.py new file mode 100644 index 0000000..42e0c05 --- /dev/null +++ b/nikola/utils.py @@ -0,0 +1,466 @@ +"""Utility functions.""" + +from collections import defaultdict +import cPickle +import datetime +import hashlib +import os +import re +import codecs +import shutil +import string +import subprocess +import sys +from zipfile import ZipFile as zip + +from unidecode import unidecode + +import PyRSS2Gen as rss + +__all__ = ['get_theme_path', 'get_theme_chain', 'load_messages', 'copy_tree', + 'get_compile_html', 'get_template_module', 'generic_rss_renderer', + 'copy_file', 'slugify', 'unslugify', 'get_meta', 'to_datetime', + 'apply_filters', 'config_changed'] + + +class config_changed(object): + """ A copy of doit's but using pickle instead of serializing manually.""" + + def __init__(self, config): + self.config = config + + def __call__(self, task, values): + config_digest = None + if isinstance(self.config, basestring): + config_digest = self.config + elif isinstance(self.config, dict): + data = cPickle.dumps(self.config) + config_digest = hashlib.md5(data).hexdigest() + else: + raise Exception(('Invalid type of config_changed parameter got %s' + + ', must be string or dict') % (type(self.config),)) + + def _save_config(): + return {'_config_changed': config_digest} + + task.insert_action(_save_config) + last_success = values.get('_config_changed') + if last_success is None: + return False + return (last_success == config_digest) + + +def get_theme_path(theme): + """Given a theme name, returns the path where its files are located. + + Looks in ./themes and in the place where themes go when installed. + """ + dir_name = os.path.join('themes', theme) + if os.path.isdir(dir_name): + return dir_name + dir_name = os.path.join(os.path.dirname(__file__), + 'data', 'themes', theme) + if os.path.isdir(dir_name): + return dir_name + raise Exception(u"Can't find theme '%s'" % theme) + + +def re_meta(line, match): + """ re.compile for meta""" + reStr = re.compile('^%s(.*)' % re.escape(match)) + result = reStr.findall(line) + if result: + return result[0].strip() + else: + return '' + + +def get_meta(source_path): + """get post's meta from source""" + with codecs.open(source_path, "r", "utf8") as meta_file: + meta_data = meta_file.readlines(15) + title = slug = date = tags = link = description = '' + + re_md_title = re.compile(r'^%s([^%s].*)' % + (re.escape('#'), re.escape('#'))) + re_rst_title = re.compile(r'^([^%s ].*)' % re.escape(string.punctuation)) + + for meta in meta_data: + if not title: + title = re_meta(meta, '.. title:') + if not title: + if re_rst_title.findall(meta): + title = re_rst_title.findall(meta)[0] + if not title: + if re_md_title.findall(meta): + title = re_md_title.findall(meta)[0] + if not slug: + slug = re_meta(meta, '.. slug:') + if not date: + date = re_meta(meta, '.. date:') + if not tags: + tags = re_meta(meta, '.. tags:') + if not link: + link = re_meta(meta, '.. link:') + if not description: + description = re_meta(meta, '.. description:') + + # TODO: either enable or delete + #if not date: + #from datetime import datetime + #date = datetime.fromtimestamp( + # os.path.getmtime(source_path)).strftime('%Y/%m/%d %H:%M') + + return (title, slug, date, tags, link, description) + + +def get_template_engine(themes): + for theme_name in themes: + engine_path = os.path.join(get_theme_path(theme_name), 'engine') + if os.path.isfile(engine_path): + with open(engine_path) as fd: + return fd.readlines()[0].strip() + # default + return 'mako' + +def get_theme_bundles(themes): + """Given a theme chain, return the bundle definitions.""" + bundles = {} + for theme_name in themes: + bundles_path = os.path.join(get_theme_path(theme_name), 'bundles') + if os.path.isfile(bundles_path): + with open(bundles_path) as fd: + for line in fd: + name, files = line.split('=') + files = [f.strip() for f in files.split(',')] + bundles[name.strip()] = files + break + return bundles + +def get_theme_chain(theme): + """Create the full theme inheritance chain.""" + themes = [theme] + + def get_parent(theme_name): + parent_path = os.path.join(get_theme_path(theme_name), 'parent') + if os.path.isfile(parent_path): + with open(parent_path) as fd: + return fd.readlines()[0].strip() + return None + + while True: + parent = get_parent(themes[-1]) + # Avoid silly loops + if parent is None or parent in themes: + break + themes.append(parent) + return themes + + +def load_messages(themes, translations): + """ Load theme's messages into context. + + All the messages from parent themes are loaded, + and "younger" themes have priority. + """ + messages = defaultdict(dict) + for theme_name in themes[::-1]: + msg_folder = os.path.join(get_theme_path(theme_name), 'messages') + oldpath = sys.path + sys.path.insert(0, msg_folder) + for lang in translations.keys(): + # If we don't do the reload, the module is cached + translation = __import__(lang) + reload(translation) + messages[lang].update(translation.MESSAGES) + del(translation) + sys.path = oldpath + return messages + + +def copy_tree(src, dst, link_cutoff=None): + """Copy a src tree to the dst folder. + + Example: + + src = "themes/default/assets" + dst = "output/assets" + + should copy "themes/defauts/assets/foo/bar" to + "output/assets/foo/bar" + + if link_cutoff is set, then the links pointing at things + *inside* that folder will stay as links, and links + pointing *outside* that folder will be copied. + """ + ignore = set(['.svn']) + base_len = len(src.split(os.sep)) + for root, dirs, files in os.walk(src): + root_parts = root.split(os.sep) + if set(root_parts) & ignore: + continue + dst_dir = os.path.join(dst, *root_parts[base_len:]) + if not os.path.isdir(dst_dir): + os.makedirs(dst_dir) + for src_name in files: + if src_name == '.DS_Store': + continue + dst_file = os.path.join(dst_dir, src_name) + src_file = os.path.join(root, src_name) + yield { + 'name': dst_file, + 'file_dep': [src_file], + 'targets': [dst_file], + 'actions': [(copy_file, (src_file, dst_file, link_cutoff))], + 'clean': True, + } + + +def get_compile_html(input_format): + """Setup input format library.""" + if input_format == "rest": + import rest + compile_html = rest.compile_html + elif input_format == "markdown": + import md + compile_html = md.compile_html + elif input_format == "html": + compile_html = copy_file + return compile_html + + +class CompileHtmlGetter(object): + """Get the correct compile_html for a file, based on file extension. + + This class exists to provide a closure for its `__call__` method. + """ + def __init__(self, post_compilers): + """Store post_compilers for use by `__call__`. + + See the structure of `post_compilers` in conf.py + """ + self.post_compilers = post_compilers + self.inverse_post_compilers = {} + + def __call__(self, source_name): + """Get the correct compiler for a post from `conf.post_compilers` + + To make things easier for users, the mapping in conf.py is + compiler->[extensions], although this is less convenient for us. The + majority of this function is reversing that dictionary and error + checking. + """ + ext = os.path.splitext(source_name)[1] + try: + compile_html = self.inverse_post_compilers[ext] + except KeyError: + # Find the correct compiler for this files extension + langs = [lang for lang, exts in + self.post_compilers.items() + if ext in exts] + if len(langs) != 1: + if len(set(langs)) > 1: + exit("Your file extension->compiler definition is" + "ambiguous.\nPlease remove one of the file extensions" + "from 'post_compilers' in conf.py\n(The error is in" + "one of %s)" % ', '.join(langs)) + elif len(langs) > 1: + langs = langs[:1] + else: + exit("post_compilers in conf.py does not tell me how to " + "handle '%s' extensions." % ext) + + lang = langs[0] + compile_html = get_compile_html(lang) + + self.inverse_post_compilers[ext] = compile_html + + return compile_html + + +def get_template_module(template_engine, themes): + """Setup templating library.""" + templates_module = None + if template_engine == "mako": + import mako_templates + templates_module = mako_templates + elif template_engine == "jinja": + import jinja_templates + templates_module = jinja_templates + templates_module.lookup = \ + templates_module.get_template_lookup( + [os.path.join(get_theme_path(name), "templates") + for name in themes]) + return templates_module + + +def generic_rss_renderer(lang, title, link, description, + timeline, output_path): + """Takes all necessary data, and renders a RSS feed in output_path.""" + items = [] + for post in timeline[:10]: + args = { + 'title': post.title(lang), + 'link': post.permalink(lang), + 'description': post.text(lang, teaser_only=True), + 'guid': post.permalink(lang), + 'pubDate': post.date, + } + items.append(rss.RSSItem(**args)) + rss_obj = rss.RSS2( + title=title, + link=link, + description=description, + lastBuildDate=datetime.datetime.now(), + items=items, + generator='nikola', + ) + dst_dir = os.path.dirname(output_path) + if not os.path.isdir(dst_dir): + os.makedirs(dst_dir) + with open(output_path, "wb+") as rss_file: + rss_obj.write_xml(rss_file) + + +def copy_file(source, dest, cutoff=None): + dst_dir = os.path.dirname(dest) + if not os.path.isdir(dst_dir): + os.makedirs(dst_dir) + if os.path.islink(source): + link_target = os.path.relpath( + os.path.normpath(os.path.join(dst_dir, os.readlink(source)))) + # Now we have to decide if we copy the link target or the + # link itself. + if cutoff is None or not link_target.startswith(cutoff): + # We copy + shutil.copy2(source, dest) + else: + # We link + if os.path.exists(dest) or os.path.islink(dest): + os.unlink(dest) + os.symlink(os.readlink(source), dest) + else: + shutil.copy2(source, dest) + + +def remove_file(source): + if os.path.isdir(source): + shutil.rmtree(source) + elif os.path.isfile(source) or os.path.islink(source): + os.remove(source) + +# slugify is copied from +# http://code.activestate.com/recipes/ +# 577257-slugify-make-a-string-usable-in-a-url-or-filename/ +_slugify_strip_re = re.compile(r'[^\w\s-]') +_slugify_hyphenate_re = re.compile(r'[-\s]+') + + +def slugify(value): + """ + Normalizes string, converts to lowercase, removes non-alpha characters, + and converts spaces to hyphens. + + From Django's "django/template/defaultfilters.py". + """ + value = unidecode(value) + value = unicode(_slugify_strip_re.sub('', value).strip().lower()) + return _slugify_hyphenate_re.sub('-', value) + + +def unslugify(value): + """ + Given a slug string (as a filename), return a human readable string + """ + value = re.sub('^[0-9]', '', value) + value = re.sub('([_\-\.])', ' ', value) + value = value.strip().capitalize() + return value + + +# A very slightly safer version of zip.extractall that works on +# python < 2.6 + +class UnsafeZipException(Exception): + pass + + +def extract_all(zipfile): + pwd = os.getcwd() + os.chdir('themes') + z = zip(zipfile) + namelist = z.namelist() + for f in namelist: + if f.endswith('/') and '..' in f: + raise UnsafeZipException( + 'The zip file contains ".." and is not safe to expand.') + for f in namelist: + if f.endswith('/'): + if not os.path.isdir(f): + try: + os.makedirs(f) + except: + raise OSError("mkdir '%s' error!" % f) + else: + z.extract(f) + os.chdir(pwd) + + +# From https://github.com/lepture/liquidluck/blob/develop/liquidluck/utils.py +def to_datetime(value): + if isinstance(value, datetime.datetime): + return value + supported_formats = [ + '%Y/%m/%d %H:%M', + '%Y/%m/%d %H:%M:%S', + '%Y/%m/%d %I:%M:%S %p', + '%a %b %d %H:%M:%S %Y', + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%d %H:%M', + '%Y-%m-%dT%H:%M', + '%Y%m%d %H:%M:%S', + '%Y%m%d %H:%M', + '%Y-%m-%d', + '%Y%m%d', + ] + for format in supported_formats: + try: + return datetime.datetime.strptime(value, format) + except ValueError: + pass + raise ValueError('Unrecognized date/time: %r' % value) + + +def apply_filters(task, filters): + """ + Given a task, checks its targets. + If any of the targets has a filter that matches, + adds the filter commands to the commands of the task, + and the filter itself to the uptodate of the task. + """ + + def filter_matches(ext): + for key, value in filters.items(): + if isinstance(key, (tuple, list)): + if ext in key: + return value + elif isinstance(key, (str, unicode)): + if ext == key: + return value + else: + assert False, key + + for target in task['targets']: + ext = os.path.splitext(target)[-1].lower() + filter_ = filter_matches(ext) + if filter_: + for action in filter_: + def unlessLink(action, target): + if not os.path.islink(target): + if callable(action): + action(target) + else: + subprocess.check_call(action % target, shell=True) + + task['actions'].append((unlessLink, (action, target))) + return task |
