# -*- coding: utf-8 -*- # Copyright © 2012-2014 Roberto Alsina and others. # Permission is hereby granted, free of charge, to any # person obtaining a copy of this software and associated # documentation files (the "Software"), to deal in the # Software without restriction, including without limitation # the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the # Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice # shall be included in all copies or substantial portions of # the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR # PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS # OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR # OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR # OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. from __future__ import unicode_literals, print_function import os import re import sys from lxml import etree try: from urlparse import urlparse from urllib import unquote except ImportError: from urllib.parse import urlparse, unquote # NOQA try: import requests except ImportError: requests = None # NOQA try: import phpserialize except ImportError: phpserialize = None # NOQA from nikola.plugin_categories import Command from nikola import utils from nikola.utils import req_missing from nikola.plugins.basic_import import ImportMixin, links LOGGER = utils.get_logger('import_wordpress', utils.STDERR_HANDLER) class CommandImportWordpress(Command, ImportMixin): """Import a WordPress dump.""" name = "import_wordpress" needs_config = False doc_usage = "[options] wordpress_export_file" doc_purpose = "import a WordPress dump" cmd_options = ImportMixin.cmd_options + [ { 'name': 'exclude_drafts', 'long': 'no-drafts', 'short': 'd', 'default': False, 'type': bool, 'help': "Don't import drafts", }, { 'name': 'squash_newlines', 'long': 'squash-newlines', 'default': False, 'type': bool, 'help': "Shorten multiple newlines in a row to only two newlines", }, { 'name': 'no_downloads', 'long': 'no-downloads', 'default': False, 'type': bool, 'help': "Do not try to download files for the import", }, ] def _execute(self, options={}, args=[]): """Import a WordPress blog from an export file into a Nikola site.""" if not args: print(self.help()) return options['filename'] = args.pop(0) if args and ('output_folder' not in args or options['output_folder'] == 'new_site'): options['output_folder'] = args.pop(0) if args: LOGGER.warn('You specified additional arguments ({0}). Please consider ' 'putting these arguments before the filename if you ' 'are running into problems.'.format(args)) self.import_into_existing_site = False self.url_map = {} self.timezone = None self.wordpress_export_file = options['filename'] self.squash_newlines = options.get('squash_newlines', False) self.output_folder = options.get('output_folder', 'new_site') self.exclude_drafts = options.get('exclude_drafts', False) self.no_downloads = options.get('no_downloads', False) if not self.no_downloads: def show_info_about_mising_module(modulename): LOGGER.error( 'To use the "{commandname}" command, you have to install ' 'the "{package}" package or supply the "--no-downloads" ' 'option.'.format( commandname=self.name, package=modulename) ) if requests is None and phpserialize is None: req_missing(['requests', 'phpserialize'], 'import WordPress dumps without --no-downloads') elif requests is None: req_missing(['requests'], 'import WordPress dumps without --no-downloads') elif phpserialize is None: req_missing(['phpserialize'], 'import WordPress dumps without --no-downloads') channel = self.get_channel_from_file(self.wordpress_export_file) self.context = self.populate_context(channel) conf_template = self.generate_base_site() self.import_posts(channel) self.context['REDIRECTIONS'] = self.configure_redirections( self.url_map) self.write_urlmap_csv( os.path.join(self.output_folder, 'url_map.csv'), self.url_map) rendered_template = conf_template.render(**self.context) rendered_template = re.sub('# REDIRECTIONS = ', 'REDIRECTIONS = ', rendered_template) if self.timezone: rendered_template = re.sub('# TIMEZONE = \'UTC\'', 'TIMEZONE = \'' + self.timezone + '\'', rendered_template) self.write_configuration(self.get_configuration_output_path(), rendered_template) @classmethod def _glue_xml_lines(cls, xml): new_xml = xml[0] previous_line_ended_in_newline = new_xml.endswith(b'\n') previous_line_was_indentet = False for line in xml[1:]: if (re.match(b'^[ \t]+', line) and previous_line_ended_in_newline): new_xml = b''.join((new_xml, line)) previous_line_was_indentet = True elif previous_line_was_indentet: new_xml = b''.join((new_xml, line)) previous_line_was_indentet = False else: new_xml = b'\n'.join((new_xml, line)) previous_line_was_indentet = False previous_line_ended_in_newline = line.endswith(b'\n') return new_xml @classmethod def read_xml_file(cls, filename): xml = [] with open(filename, 'rb') as fd: for line in fd: # These explode etree and are useless if b' {1}".format(url, dst_path)) self.download_url_content_to_file(url, dst_path) dst_url = '/'.join(dst_path.split(os.sep)[2:]) links[link] = '/' + dst_url links[url] = '/' + dst_url self.download_additional_image_sizes( item, wordpress_namespace, os.path.dirname(url) ) def download_additional_image_sizes(self, item, wordpress_namespace, source_path): if phpserialize is None: return additional_metadata = item.findall('{{{0}}}postmeta'.format(wordpress_namespace)) if additional_metadata is None: return for element in additional_metadata: meta_key = element.find('{{{0}}}meta_key'.format(wordpress_namespace)) if meta_key is not None and meta_key.text == '_wp_attachment_metadata': meta_value = element.find('{{{0}}}meta_value'.format(wordpress_namespace)) if meta_value is None: continue # Someone from Wordpress thought it was a good idea # serialize PHP objects into that metadata field. Given # that the export should give you the power to insert # your blogging into another site or system its not. # Why don't they just use JSON? if sys.version_info[0] == 2: metadata = phpserialize.loads(meta_value.text) size_key = 'sizes' file_key = 'file' else: metadata = phpserialize.loads(meta_value.text.encode('UTF-8')) size_key = b'sizes' file_key = b'file' if not size_key in metadata: continue for filename in [metadata[size_key][size][file_key] for size in metadata[size_key]]: url = '/'.join([source_path, filename.decode('utf-8')]) path = urlparse(url).path dst_path = os.path.join(*([self.output_folder, 'files'] + list(path.split('/')))) dst_dir = os.path.dirname(dst_path) utils.makedirs(dst_dir) LOGGER.notice("Downloading {0} => {1}".format(url, dst_path)) self.download_url_content_to_file(url, dst_path) dst_url = '/'.join(dst_path.split(os.sep)[2:]) links[url] = '/' + dst_url links[url] = '/' + dst_url @staticmethod def transform_sourcecode(content): new_content = re.sub('\[sourcecode language="([^"]+)"\]', "\n~~~~~~~~~~~~{.\\1}\n", content) new_content = new_content.replace('[/sourcecode]', "\n~~~~~~~~~~~~\n") return new_content @staticmethod def transform_caption(content): new_caption = re.sub(r'\[/caption\]', '', content) new_caption = re.sub(r'\[caption.*\]', '', new_caption) return new_caption def transform_multiple_newlines(self, content): """Replaces multiple newlines with only two.""" if self.squash_newlines: return re.sub(r'\n{3,}', r'\n\n', content) else: return content def transform_content(self, content): new_content = self.transform_sourcecode(content) new_content = self.transform_caption(new_content) new_content = self.transform_multiple_newlines(new_content) return new_content def import_item(self, item, wordpress_namespace, out_folder=None): """Takes an item from the feed and creates a post file.""" if out_folder is None: out_folder = 'posts' title = get_text_tag(item, 'title', 'NO TITLE') # link is something like http://foo.com/2012/09/01/hello-world/ # So, take the path, utils.slugify it, and that's our slug link = get_text_tag(item, 'link', None) path = unquote(urlparse(link).path) # In python 2, path is a str. slug requires a unicode # object. According to wikipedia, unquoted strings will # usually be UTF8 if isinstance(path, utils.bytes_str): path = path.decode('utf8') slug = utils.slugify(path) if not slug: # it happens if the post has no "nice" URL slug = get_text_tag( item, '{{{0}}}post_name'.format(wordpress_namespace), None) if not slug: # it *may* happen slug = get_text_tag( item, '{{{0}}}post_id'.format(wordpress_namespace), None) if not slug: # should never happen LOGGER.error("Error converting post:", title) return description = get_text_tag(item, 'description', '') post_date = get_text_tag( item, '{{{0}}}post_date'.format(wordpress_namespace), None) dt = utils.to_datetime(post_date) if dt.tzinfo and self.timezone is None: self.timezone = utils.get_tzname(dt) status = get_text_tag( item, '{{{0}}}status'.format(wordpress_namespace), 'publish') content = get_text_tag( item, '{http://purl.org/rss/1.0/modules/content/}encoded', '') tags = [] if status == 'trash': LOGGER.warn('Trashed post "{0}" will not be imported.'.format(title)) return elif status != 'publish': tags.append('draft') is_draft = True else: is_draft = False for tag in item.findall('category'): text = tag.text if text == 'Uncategorized': continue tags.append(text) if is_draft and self.exclude_drafts: LOGGER.notice('Draft "{0}" will not be imported.'.format(title)) elif content.strip(): # If no content is found, no files are written. self.url_map[link] = self.context['SITE_URL'] + '/' + \ out_folder + '/' + slug + '.html' content = self.transform_content(content) self.write_metadata(os.path.join(self.output_folder, out_folder, slug + '.meta'), title, slug, post_date, description, tags) self.write_content( os.path.join(self.output_folder, out_folder, slug + '.wp'), content) else: LOGGER.warn('Not going to import "{0}" because it seems to contain' ' no content.'.format(title)) def process_item(self, item): # The namespace usually is something like: # http://wordpress.org/export/1.2/ wordpress_namespace = item.nsmap['wp'] post_type = get_text_tag( item, '{{{0}}}post_type'.format(wordpress_namespace), 'post') if post_type == 'attachment': self.import_attachment(item, wordpress_namespace) elif post_type == 'post': self.import_item(item, wordpress_namespace, 'posts') else: self.import_item(item, wordpress_namespace, 'stories') def import_posts(self, channel): for item in channel.findall('item'): self.process_item(item) def get_text_tag(tag, name, default): if tag is None: return default t = tag.find(name) if t is not None: return t.text else: return default