# Copyright (c) 2012 Roberto Alsina y otros. # Permission is hereby granted, free of charge, to any # person obtaining a copy of this software and associated # documentation files (the "Software"), to deal in the # Software without restriction, including without limitation # the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the # Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice # shall be included in all copies or substantial portions of # the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR # PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS # OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR # OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR # OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. from __future__ import unicode_literals, print_function import codecs import csv import datetime import os import re try: from urlparse import urlparse except ImportError: from urllib.parse import urlparse # NOQA from lxml import etree, html from mako.template import Template try: import requests except ImportError: requests = None # NOQA from nikola.plugin_categories import Command from nikola import utils links = {} class CommandImportWordpress(Command): """Import a wordpress dump.""" name = "import_wordpress" needs_config = False doc_usage = "[options] wordpress_export_file" doc_purpose = "Import a wordpress dump." cmd_options = [ { 'name': 'output_folder', 'long': 'output-folder', 'short': 'o', 'default': 'new_site', 'help': 'Location to write imported content.' }, { 'name': 'exclude_drafts', 'long': 'no-drafts', 'short': 'd', 'default': False, 'type': bool, 'help': "Don't import drafts", }, { 'name': 'squash_newlines', 'long': 'squash-newlines', 'default': False, 'type': bool, 'help': "Shorten multiple newlines in a row to only two newlines", }, { 'name': 'no_downloads', 'long': 'no-downloads', 'default': False, 'type': bool, 'help': "Do not try to download files for the import", }, ] def _execute(self, options={}, args=[]): """Import a Wordpress blog from an export file into a Nikola site.""" # Parse the data print(options, args) if requests is None: print('To use the import_wordpress command,' ' you have to install the "requests" package.') return if not args: print(self.help()) return options['filename'] = args[0] if len(args) > 1: options['output_folder'] = args[1] self.wordpress_export_file = options['filename'] self.squash_newlines = options.get('squash_newlines', False) self.no_downloads = options.get('no_downloads', False) self.output_folder = options.get('output_folder', 'new_site') self.import_into_existing_site = False self.exclude_drafts = options.get('exclude_drafts', False) self.url_map = {} channel = self.get_channel_from_file(self.wordpress_export_file) self.context = self.populate_context(channel) conf_template = self.generate_base_site() self.import_posts(channel) self.context['REDIRECTIONS'] = self.configure_redirections( self.url_map) self.write_urlmap_csv( os.path.join(self.output_folder, 'url_map.csv'), self.url_map) rendered_template = conf_template.render(**self.context) rendered_template = re.sub('# REDIRECTIONS = ', 'REDIRECTIONS = ', rendered_template) self.write_configuration(self.get_configuration_output_path(), rendered_template) @classmethod def _glue_xml_lines(cls, xml): new_xml = xml[0] previous_line_ended_in_newline = new_xml.endswith(b'\n') previous_line_was_indentet = False for line in xml[1:]: if (re.match(b'^[ \t]+', line) and previous_line_ended_in_newline): new_xml = b''.join((new_xml, line)) previous_line_was_indentet = True elif previous_line_was_indentet: new_xml = b''.join((new_xml, line)) previous_line_was_indentet = False else: new_xml = b'\n'.join((new_xml, line)) previous_line_was_indentet = False previous_line_ended_in_newline = line.endswith(b'\n') return new_xml @classmethod def read_xml_file(cls, filename): xml = [] with open(filename, 'rb') as fd: for line in fd: # These explode etree and are useless if b' {1}".format(url, dst_path)) self.download_url_content_to_file(url, dst_path) dst_url = '/'.join(dst_path.split(os.sep)[2:]) links[link] = '/' + dst_url links[url] = '/' + dst_url @staticmethod def transform_sourcecode(content): new_content = re.sub('\[sourcecode language="([^"]+)"\]', "\n~~~~~~~~~~~~{.\\1}\n", content) new_content = new_content.replace('[/sourcecode]', "\n~~~~~~~~~~~~\n") return new_content @staticmethod def transform_caption(content): new_caption = re.sub(r'\[/caption\]', '', content) new_caption = re.sub(r'\[caption.*\]', '', new_caption) return new_caption def transform_multiple_newlines(self, content): """Replaces multiple newlines with only two.""" if self.squash_newlines: return re.sub(r'\n{3,}', r'\n\n', content) else: return content def transform_content(self, content): new_content = self.transform_sourcecode(content) new_content = self.transform_caption(new_content) new_content = self.transform_multiple_newlines(new_content) return new_content @classmethod def write_content(cls, filename, content): doc = html.document_fromstring(content) doc.rewrite_links(replacer) with open(filename, "wb+") as fd: fd.write(html.tostring(doc, encoding='utf8')) @staticmethod def write_metadata(filename, title, slug, post_date, description, tags): if not description: description = "" with codecs.open(filename, "w+", "utf8") as fd: fd.write('{0}\n'.format(title)) fd.write('{0}\n'.format(slug)) fd.write('{0}\n'.format(post_date)) fd.write('{0}\n'.format(','.join(tags))) fd.write('\n') fd.write('{0}\n'.format(description)) def import_item(self, item, wordpress_namespace, out_folder=None): """Takes an item from the feed and creates a post file.""" if out_folder is None: out_folder = 'posts' title = get_text_tag(item, 'title', 'NO TITLE') # link is something like http://foo.com/2012/09/01/hello-world/ # So, take the path, utils.slugify it, and that's our slug link = get_text_tag(item, 'link', None) slug = utils.slugify(urlparse(link).path) if not slug: # it happens if the post has no "nice" URL slug = get_text_tag( item, '{{{0}}}post_name'.format(wordpress_namespace), None) if not slug: # it *may* happen slug = get_text_tag( item, '{{{0}}}post_id'.format(wordpress_namespace), None) if not slug: # should never happen print("Error converting post:", title) return description = get_text_tag(item, 'description', '') post_date = get_text_tag( item, '{{{0}}}post_date'.format(wordpress_namespace), None) status = get_text_tag( item, '{{{0}}}status'.format(wordpress_namespace), 'publish') content = get_text_tag( item, '{http://purl.org/rss/1.0/modules/content/}encoded', '') tags = [] if status != 'publish': tags.append('draft') is_draft = True else: is_draft = False for tag in item.findall('category'): text = tag.text if text == 'Uncategorized': continue tags.append(text) if is_draft and self.exclude_drafts: print('Draft "{0}" will not be imported.'.format(title)) elif content.strip(): # If no content is found, no files are written. self.url_map[link] = self.context['SITE_URL'] + '/' + \ out_folder + '/' + slug + '.html' content = self.transform_content(content) self.write_metadata(os.path.join(self.output_folder, out_folder, slug + '.meta'), title, slug, post_date, description, tags) self.write_content( os.path.join(self.output_folder, out_folder, slug + '.wp'), content) else: print('Not going to import "{0}" because it seems to contain' ' no content.'.format(title)) def process_item(self, item): # The namespace usually is something like: # http://wordpress.org/export/1.2/ wordpress_namespace = item.nsmap['wp'] post_type = get_text_tag( item, '{{{0}}}post_type'.format(wordpress_namespace), 'post') if post_type == 'attachment': self.import_attachment(item, wordpress_namespace) elif post_type == 'post': self.import_item(item, wordpress_namespace, 'posts') else: self.import_item(item, wordpress_namespace, 'stories') def import_posts(self, channel): for item in channel.findall('item'): self.process_item(item) @staticmethod def write_urlmap_csv(output_file, url_map): with codecs.open(output_file, 'w+', 'utf8') as fd: csv_writer = csv.writer(fd) for item in url_map.items(): csv_writer.writerow(item) def get_configuration_output_path(self): if not self.import_into_existing_site: filename = 'conf.py' else: filename = 'conf.py.wordpress_import-{0}'.format( datetime.datetime.now().strftime('%Y%m%d_%H%M%s')) config_output_path = os.path.join(self.output_folder, filename) print('Configuration will be written to:', config_output_path) return config_output_path @staticmethod def write_configuration(filename, rendered_template): with codecs.open(filename, 'w+', 'utf8') as fd: fd.write(rendered_template) def replacer(dst): return links.get(dst, dst) def get_text_tag(tag, name, default): if tag is None: return default t = tag.find(name) if t is not None: return t.text else: return default