aboutsummaryrefslogtreecommitdiffstats
path: root/nikola/plugins/command/import_wordpress.py
diff options
context:
space:
mode:
authorLibravatarAgustin Henze <tin@sluc.org.ar>2015-08-26 07:57:23 -0300
committerLibravatarAgustin Henze <tin@sluc.org.ar>2015-08-26 07:57:23 -0300
commit70ceb871117ca811d63cb02671dc0fefc2700883 (patch)
tree846133ea39797d2cd1101cff2ac0818167353490 /nikola/plugins/command/import_wordpress.py
parent8559119e2f45b7f6508282962c0430423bfab051 (diff)
parent787b97a4cb24330b36f11297c6d3a7a473a907d0 (diff)
Merge tag 'upstream/7.6.4'
Upstream version 7.6.4
Diffstat (limited to 'nikola/plugins/command/import_wordpress.py')
-rw-r--r--nikola/plugins/command/import_wordpress.py666
1 files changed, 549 insertions, 117 deletions
diff --git a/nikola/plugins/command/import_wordpress.py b/nikola/plugins/command/import_wordpress.py
index 674fc2a..a652ec8 100644
--- a/nikola/plugins/command/import_wordpress.py
+++ b/nikola/plugins/command/import_wordpress.py
@@ -24,13 +24,18 @@
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+"""Import a WordPress dump."""
+
from __future__ import unicode_literals, print_function
import os
import re
import sys
import datetime
+import io
+import json
import requests
from lxml import etree
+from collections import defaultdict
try:
from urlparse import urlparse
@@ -53,7 +58,37 @@ from nikola.plugins.command.init import SAMPLE_CONF, prepare_config, format_defa
LOGGER = utils.get_logger('import_wordpress', utils.STDERR_HANDLER)
+def install_plugin(site, plugin_name, output_dir=None, show_install_notes=False):
+ """Install a Nikola plugin."""
+ LOGGER.notice("Installing plugin '{0}'".format(plugin_name))
+ # Get hold of the 'plugin' plugin
+ plugin_installer_info = site.plugin_manager.getPluginByName('plugin', 'Command')
+ if plugin_installer_info is None:
+ LOGGER.error('Internal error: cannot find the "plugin" plugin which is supposed to come with Nikola!')
+ return False
+ if not plugin_installer_info.is_activated:
+ # Someone might have disabled the plugin in the `conf.py` used
+ site.plugin_manager.activatePluginByName(plugin_installer_info.name)
+ plugin_installer_info.plugin_object.set_site(site)
+ plugin_installer = plugin_installer_info.plugin_object
+ # Try to install the requested plugin
+ options = {}
+ for option in plugin_installer.cmd_options:
+ options[option['name']] = option['default']
+ options['install'] = plugin_name
+ options['output_dir'] = output_dir
+ options['show_install_notes'] = show_install_notes
+ if plugin_installer.execute(options=options) > 0:
+ return False
+ # Let the plugin manager find newly installed plugins
+ site.plugin_manager.collectPlugins()
+ # Re-scan for compiler extensions
+ site.compiler_extensions = site._activate_plugins_of_category("CompilerExtension")
+ return True
+
+
class CommandImportWordpress(Command, ImportMixin):
+
"""Import a WordPress dump."""
name = "import_wordpress"
@@ -70,6 +105,20 @@ class CommandImportWordpress(Command, ImportMixin):
'help': "Don't import drafts",
},
{
+ 'name': 'exclude_privates',
+ 'long': 'exclude-privates',
+ 'default': False,
+ 'type': bool,
+ 'help': "Don't import private posts",
+ },
+ {
+ 'name': 'include_empty_items',
+ 'long': 'include-empty-items',
+ 'default': False,
+ 'type': bool,
+ 'help': "Include empty posts and pages",
+ },
+ {
'name': 'squash_newlines',
'long': 'squash-newlines',
'default': False,
@@ -107,15 +156,57 @@ class CommandImportWordpress(Command, ImportMixin):
'type': str,
'help': "The pattern for translation files names",
},
+ {
+ 'name': 'export_categories_as_categories',
+ 'long': 'export-categories-as-categories',
+ 'default': False,
+ 'type': bool,
+ 'help': "Export categories as categories, instead of treating them as tags",
+ },
+ {
+ 'name': 'export_comments',
+ 'long': 'export-comments',
+ 'default': False,
+ 'type': bool,
+ 'help': "Export comments as .wpcomment files",
+ },
+ {
+ 'name': 'transform_to_html',
+ 'long': 'transform-to-html',
+ 'default': False,
+ 'type': bool,
+ 'help': "Uses WordPress page compiler to transform WordPress posts directly to HTML during import",
+ },
+ {
+ 'name': 'use_wordpress_compiler',
+ 'long': 'use-wordpress-compiler',
+ 'default': False,
+ 'type': bool,
+ 'help': "Instead of converting posts to markdown, leave them as is and use the WordPress page compiler",
+ },
+ {
+ 'name': 'install_wordpress_compiler',
+ 'long': 'install-wordpress-compiler',
+ 'default': False,
+ 'type': bool,
+ 'help': "Automatically installs the WordPress page compiler (either locally or in the new site) if required by other options.\nWarning: the compiler is GPL software!",
+ },
]
all_tags = set([])
- def _execute(self, options={}, args=[]):
- """Import a WordPress blog from an export file into a Nikola site."""
- if not args:
- print(self.help())
+ def _find_wordpress_compiler(self):
+ """Find WordPress compiler plugin."""
+ if self.wordpress_page_compiler is not None:
return
-
+ plugin_info = self.site.plugin_manager.getPluginByName('wordpress', 'PageCompiler')
+ if plugin_info is not None:
+ if not plugin_info.is_activated:
+ self.site.plugin_manager.activatePluginByName(plugin_info.name)
+ plugin_info.plugin_object.set_site(self.site)
+ self.wordpress_page_compiler = plugin_info.plugin_object
+
+ def _read_options(self, options, args):
+ """Read command-line options."""
options['filename'] = args.pop(0)
if args and ('output_folder' not in args or
@@ -136,19 +227,76 @@ class CommandImportWordpress(Command, ImportMixin):
self.output_folder = options.get('output_folder', 'new_site')
self.exclude_drafts = options.get('exclude_drafts', False)
+ self.exclude_privates = options.get('exclude_privates', False)
self.no_downloads = options.get('no_downloads', False)
+ self.import_empty_items = options.get('include_empty_items', False)
+
+ self.export_categories_as_categories = options.get('export_categories_as_categories', False)
+ self.export_comments = options.get('export_comments', False)
+
+ self.transform_to_html = options.get('transform_to_html', False)
+ self.use_wordpress_compiler = options.get('use_wordpress_compiler', False)
+ self.install_wordpress_compiler = options.get('install_wordpress_compiler', False)
+ self.wordpress_page_compiler = None
self.auth = None
if options.get('download_auth') is not None:
username_password = options.get('download_auth')
self.auth = tuple(username_password.split(':', 1))
if len(self.auth) < 2:
- print("Please specify HTTP authentication credentials in the form username:password.")
+ LOGGER.error("Please specify HTTP authentication credentials in the form username:password.")
return False
self.separate_qtranslate_content = options.get('separate_qtranslate_content')
self.translations_pattern = options.get('translations_pattern')
+ if self.transform_to_html and self.use_wordpress_compiler:
+ LOGGER.warn("It does not make sense to combine --transform-to-html with --use-wordpress-compiler, as the first converts all posts to HTML and the latter option affects zero posts.")
+
+ if self.transform_to_html:
+ self._find_wordpress_compiler()
+ if not self.wordpress_page_compiler and self.install_wordpress_compiler:
+ if not install_plugin(self.site, 'wordpress_compiler', output_dir='plugins'): # local install
+ return False
+ self._find_wordpress_compiler()
+ if not self.wordpress_page_compiler:
+ LOGGER.error("To compile WordPress posts to HTML, the WordPress post compiler is needed. You can install it via:")
+ LOGGER.error(" nikola plugin -i wordpress_compiler")
+ LOGGER.error("Please note that the WordPress post compiler is licensed under the GPL v2.")
+ return False
+
+ return True
+
+ def _prepare(self, channel):
+ """Prepare context and category hierarchy."""
+ self.context = self.populate_context(channel)
+ self.base_dir = urlparse(self.context['BASE_URL']).path
+
+ if self.export_categories_as_categories:
+ wordpress_namespace = channel.nsmap['wp']
+ cat_map = dict()
+ for cat in channel.findall('{{{0}}}category'.format(wordpress_namespace)):
+ # cat_id = get_text_tag(cat, '{{{0}}}term_id'.format(wordpress_namespace), None)
+ cat_slug = get_text_tag(cat, '{{{0}}}category_nicename'.format(wordpress_namespace), None)
+ cat_parent_slug = get_text_tag(cat, '{{{0}}}category_parent'.format(wordpress_namespace), None)
+ cat_name = get_text_tag(cat, '{{{0}}}cat_name'.format(wordpress_namespace), None)
+ cat_path = [cat_name]
+ if cat_parent_slug in cat_map:
+ cat_path = cat_map[cat_parent_slug] + cat_path
+ cat_map[cat_slug] = cat_path
+ self._category_paths = dict()
+ for cat, path in cat_map.items():
+ self._category_paths[cat] = utils.join_hierarchical_category_path(path)
+
+ def _execute(self, options={}, args=[]):
+ """Import a WordPress blog from an export file into a Nikola site."""
+ if not args:
+ print(self.help())
+ return False
+
+ if not self._read_options(options, args):
+ return False
+
# A place holder where extra language (if detected) will be stored
self.extra_languages = set()
@@ -166,8 +314,7 @@ class CommandImportWordpress(Command, ImportMixin):
req_missing(['phpserialize'], 'import WordPress dumps without --no-downloads')
channel = self.get_channel_from_file(self.wordpress_export_file)
- self.context = self.populate_context(channel)
- self.base_dir = urlparse(self.context['BASE_URL']).path
+ self._prepare(channel)
conf_template = self.generate_base_site()
# If user has specified a custom pattern for translation files we
@@ -181,6 +328,11 @@ class CommandImportWordpress(Command, ImportMixin):
self.extra_languages)
self.context['REDIRECTIONS'] = self.configure_redirections(
self.url_map)
+ if self.timezone:
+ self.context['TIMEZONE'] = self.timezone
+ if self.export_categories_as_categories:
+ self.context['CATEGORY_ALLOW_HIERARCHIES'] = True
+ self.context['CATEGORY_OUTPUT_FLAT_HIERARCHY'] = True
# Add tag redirects
for tag in self.all_tags:
@@ -197,18 +349,21 @@ class CommandImportWordpress(Command, ImportMixin):
self.write_urlmap_csv(
os.path.join(self.output_folder, 'url_map.csv'), self.url_map)
rendered_template = conf_template.render(**prepare_config(self.context))
- rendered_template = re.sub('# REDIRECTIONS = ', 'REDIRECTIONS = ',
- rendered_template)
-
- if self.timezone:
- rendered_template = re.sub('# TIMEZONE = \'UTC\'',
- 'TIMEZONE = \'' + self.timezone + '\'',
- rendered_template)
self.write_configuration(self.get_configuration_output_path(),
rendered_template)
+ if self.use_wordpress_compiler:
+ if self.install_wordpress_compiler:
+ if not install_plugin(self.site, 'wordpress_compiler', output_dir=os.path.join(self.output_folder, 'plugins')):
+ return False
+ else:
+ LOGGER.warn("Make sure to install the WordPress page compiler via")
+ LOGGER.warn(" nikola plugin -i wordpress_compiler")
+ LOGGER.warn("in your imported blog's folder ({0}), if you haven't installed it system-wide or user-wide. Otherwise, your newly imported blog won't compile.".format(self.output_folder))
+
@classmethod
def read_xml_file(cls, filename):
+ """Read XML file into memory."""
xml = []
with open(filename, 'rb') as fd:
@@ -221,12 +376,13 @@ class CommandImportWordpress(Command, ImportMixin):
@classmethod
def get_channel_from_file(cls, filename):
+ """Get channel from XML file."""
tree = etree.fromstring(cls.read_xml_file(filename))
channel = tree.find('channel')
return channel
- @staticmethod
- def populate_context(channel):
+ def populate_context(self, channel):
+ """Populate context with config for the site."""
wordpress_namespace = channel.nsmap['wp']
context = SAMPLE_CONF.copy()
@@ -255,28 +411,31 @@ class CommandImportWordpress(Command, ImportMixin):
author,
'{{{0}}}author_display_name'.format(wordpress_namespace),
"Joe Example")
- context['POSTS'] = '''(
- ("posts/*.rst", "posts", "post.tmpl"),
- ("posts/*.txt", "posts", "post.tmpl"),
- ("posts/*.md", "posts", "post.tmpl"),
- ("posts/*.wp", "posts", "post.tmpl"),
- )'''
- context['PAGES'] = '''(
- ("stories/*.rst", "stories", "story.tmpl"),
- ("stories/*.txt", "stories", "story.tmpl"),
- ("stories/*.md", "stories", "story.tmpl"),
- ("stories/*.wp", "stories", "story.tmpl"),
- )'''
- context['COMPILERS'] = '''{
- "rest": ('.txt', '.rst'),
- "markdown": ('.md', '.mdown', '.markdown', '.wp'),
- "html": ('.html', '.htm')
- }
- '''
+ extensions = ['rst', 'txt', 'md', 'html']
+ if self.use_wordpress_compiler:
+ extensions.append('wp')
+ POSTS = '(\n'
+ PAGES = '(\n'
+ for extension in extensions:
+ POSTS += ' ("posts/*.{0}", "posts", "post.tmpl"),\n'.format(extension)
+ PAGES += ' ("stories/*.{0}", "stories", "story.tmpl"),\n'.format(extension)
+ POSTS += ')\n'
+ PAGES += ')\n'
+ context['POSTS'] = POSTS
+ context['PAGES'] = PAGES
+ COMPILERS = '{\n'
+ COMPILERS += ''' "rest": ('.txt', '.rst'),''' + '\n'
+ COMPILERS += ''' "markdown": ('.md', '.mdown', '.markdown'),''' + '\n'
+ COMPILERS += ''' "html": ('.html', '.htm'),''' + '\n'
+ if self.use_wordpress_compiler:
+ COMPILERS += ''' "wordpress": ('.wp'),''' + '\n'
+ COMPILERS += '}'
+ context['COMPILERS'] = COMPILERS
return context
def download_url_content_to_file(self, url, dst_path):
+ """Download some content (attachments) to a file."""
if self.no_downloads:
return
@@ -291,6 +450,8 @@ class CommandImportWordpress(Command, ImportMixin):
LOGGER.warn("Downloading {0} to {1} failed: {2}".format(url, dst_path, err))
def import_attachment(self, item, wordpress_namespace):
+ """Import an attachment to the site."""
+ # Download main image
url = get_text_tag(
item, '{{{0}}}attachment_url'.format(wordpress_namespace), 'foo')
link = get_text_tag(item, '{{{0}}}link'.format(wordpress_namespace),
@@ -305,59 +466,136 @@ class CommandImportWordpress(Command, ImportMixin):
links[link] = '/' + dst_url
links[url] = '/' + dst_url
- self.download_additional_image_sizes(
- item,
- wordpress_namespace,
- os.path.dirname(url)
- )
-
- def download_additional_image_sizes(self, item, wordpress_namespace, source_path):
- if phpserialize is None:
- return
+ files = [path]
+ files_meta = [{}]
additional_metadata = item.findall('{{{0}}}postmeta'.format(wordpress_namespace))
- if additional_metadata is None:
- return
-
- for element in additional_metadata:
- meta_key = element.find('{{{0}}}meta_key'.format(wordpress_namespace))
- if meta_key is not None and meta_key.text == '_wp_attachment_metadata':
- meta_value = element.find('{{{0}}}meta_value'.format(wordpress_namespace))
-
- if meta_value is None:
- continue
-
- # Someone from Wordpress thought it was a good idea
- # serialize PHP objects into that metadata field. Given
- # that the export should give you the power to insert
- # your blogging into another site or system its not.
- # Why don't they just use JSON?
- if sys.version_info[0] == 2:
- try:
- metadata = phpserialize.loads(utils.sys_encode(meta_value.text))
- except ValueError:
- # local encoding might be wrong sometimes
+ if phpserialize and additional_metadata:
+ source_path = os.path.dirname(url)
+ for element in additional_metadata:
+ meta_key = element.find('{{{0}}}meta_key'.format(wordpress_namespace))
+ if meta_key is not None and meta_key.text == '_wp_attachment_metadata':
+ meta_value = element.find('{{{0}}}meta_value'.format(wordpress_namespace))
+
+ if meta_value is None:
+ continue
+
+ # Someone from Wordpress thought it was a good idea
+ # serialize PHP objects into that metadata field. Given
+ # that the export should give you the power to insert
+ # your blogging into another site or system its not.
+ # Why don't they just use JSON?
+ if sys.version_info[0] == 2:
+ try:
+ metadata = phpserialize.loads(utils.sys_encode(meta_value.text))
+ except ValueError:
+ # local encoding might be wrong sometimes
+ metadata = phpserialize.loads(meta_value.text.encode('utf-8'))
+ else:
metadata = phpserialize.loads(meta_value.text.encode('utf-8'))
- else:
- metadata = phpserialize.loads(meta_value.text.encode('utf-8'))
- size_key = b'sizes'
- file_key = b'file'
-
- if size_key not in metadata:
- continue
-
- for filename in [metadata[size_key][size][file_key] for size in metadata[size_key]]:
- url = '/'.join([source_path, filename.decode('utf-8')])
- path = urlparse(url).path
- dst_path = os.path.join(*([self.output_folder, 'files'] + list(path.split('/'))))
- dst_dir = os.path.dirname(dst_path)
- utils.makedirs(dst_dir)
- LOGGER.info("Downloading {0} => {1}".format(url, dst_path))
- self.download_url_content_to_file(url, dst_path)
- dst_url = '/'.join(dst_path.split(os.sep)[2:])
- links[url] = '/' + dst_url
- links[url] = '/' + dst_url
+ meta_key = b'image_meta'
+ size_key = b'sizes'
+ file_key = b'file'
+ width_key = b'width'
+ height_key = b'height'
+
+ # Extract metadata
+ if width_key in metadata and height_key in metadata:
+ files_meta[0]['width'] = int(metadata[width_key])
+ files_meta[0]['height'] = int(metadata[height_key])
+
+ if meta_key in metadata:
+ image_meta = metadata[meta_key]
+ dst_meta = {}
+
+ def add(our_key, wp_key, is_int=False, ignore_zero=False, is_float=False):
+ if wp_key in image_meta:
+ value = image_meta[wp_key]
+ if is_int:
+ value = int(value)
+ if ignore_zero and value == 0:
+ return
+ elif is_float:
+ value = float(value)
+ if ignore_zero and value == 0:
+ return
+ else:
+ value = value.decode('utf-8') # assume UTF-8
+ if value == '': # skip empty values
+ return
+ dst_meta[our_key] = value
+
+ add('aperture', b'aperture', is_float=True, ignore_zero=True)
+ add('credit', b'credit')
+ add('camera', b'camera')
+ add('caption', b'caption')
+ add('created_timestamp', b'created_timestamp', is_float=True, ignore_zero=True)
+ add('copyright', b'copyright')
+ add('focal_length', b'focal_length', is_float=True, ignore_zero=True)
+ add('iso', b'iso', is_float=True, ignore_zero=True)
+ add('shutter_speed', b'shutter_speed', ignore_zero=True, is_float=True)
+ add('title', b'title')
+
+ if len(dst_meta) > 0:
+ files_meta[0]['meta'] = dst_meta
+
+ # Find other sizes of image
+ if size_key not in metadata:
+ continue
+
+ for size in metadata[size_key]:
+ filename = metadata[size_key][size][file_key]
+ url = '/'.join([source_path, filename.decode('utf-8')])
+
+ # Construct metadata
+ meta = {}
+ meta['size'] = size.decode('utf-8')
+ if width_key in metadata[size_key][size] and height_key in metadata[size_key][size]:
+ meta['width'] = metadata[size_key][size][width_key]
+ meta['height'] = metadata[size_key][size][height_key]
+
+ path = urlparse(url).path
+ dst_path = os.path.join(*([self.output_folder, 'files'] + list(path.split('/'))))
+ dst_dir = os.path.dirname(dst_path)
+ utils.makedirs(dst_dir)
+ LOGGER.info("Downloading {0} => {1}".format(url, dst_path))
+ self.download_url_content_to_file(url, dst_path)
+ dst_url = '/'.join(dst_path.split(os.sep)[2:])
+ links[url] = '/' + dst_url
+
+ files.append(path)
+ files_meta.append(meta)
+
+ # Prepare result
+ result = {}
+ result['files'] = files
+ result['files_meta'] = files_meta
+
+ # Prepare extraction of more information
+ dc_namespace = item.nsmap['dc']
+ content_namespace = item.nsmap['content']
+ excerpt_namespace = item.nsmap['excerpt']
+
+ def add(result_key, key, namespace=None, filter=None, store_empty=False):
+ if namespace is not None:
+ value = get_text_tag(item, '{{{0}}}{1}'.format(namespace, key), None)
+ else:
+ value = get_text_tag(item, key, None)
+ if value is not None:
+ if filter:
+ value = filter(value)
+ if value or store_empty:
+ result[result_key] = value
+
+ add('title', 'title')
+ add('date_utc', 'post_date_gmt', namespace=wordpress_namespace)
+ add('wordpress_user_name', 'creator', namespace=dc_namespace)
+ add('content', 'encoded', namespace=content_namespace)
+ add('excerpt', 'encoded', namespace=excerpt_namespace)
+ add('description', 'description')
+
+ return result
code_re1 = re.compile(r'\[code.* lang.*?="(.*?)?".*\](.*?)\[/code\]', re.DOTALL | re.MULTILINE)
code_re2 = re.compile(r'\[sourcecode.* lang.*?="(.*?)?".*\](.*?)\[/sourcecode\]', re.DOTALL | re.MULTILINE)
@@ -365,6 +603,7 @@ class CommandImportWordpress(Command, ImportMixin):
code_re4 = re.compile(r'\[sourcecode.*?\](.*?)\[/sourcecode\]', re.DOTALL | re.MULTILINE)
def transform_code(self, content):
+ """Transform code blocks."""
# http://en.support.wordpress.com/code/posting-source-code/. There are
# a ton of things not supported here. We only do a basic [code
# lang="x"] -> ```x translation, and remove quoted html entities (<,
@@ -390,26 +629,126 @@ class CommandImportWordpress(Command, ImportMixin):
@staticmethod
def transform_caption(content):
+ """Transform captions."""
new_caption = re.sub(r'\[/caption\]', '', content)
new_caption = re.sub(r'\[caption.*\]', '', new_caption)
return new_caption
def transform_multiple_newlines(self, content):
- """Replaces multiple newlines with only two."""
+ """Replace multiple newlines with only two."""
if self.squash_newlines:
return re.sub(r'\n{3,}', r'\n\n', content)
else:
return content
- def transform_content(self, content):
- content = self.transform_code(content)
- content = self.transform_caption(content)
- content = self.transform_multiple_newlines(content)
- return content
+ def transform_content(self, content, post_format, attachments):
+ """Transform content into appropriate format."""
+ if post_format == 'wp':
+ if self.transform_to_html:
+ additional_data = {}
+ if attachments is not None:
+ additional_data['attachments'] = attachments
+ try:
+ content = self.wordpress_page_compiler.compile_to_string(content, additional_data=additional_data)
+ except TypeError: # old versions of the plugin don't support the additional argument
+ content = self.wordpress_page_compiler.compile_to_string(content)
+ return content, 'html', True
+ elif self.use_wordpress_compiler:
+ return content, 'wp', False
+ else:
+ content = self.transform_code(content)
+ content = self.transform_caption(content)
+ content = self.transform_multiple_newlines(content)
+ return content, 'md', True
+ elif post_format == 'markdown':
+ return content, 'md', True
+ elif post_format == 'none':
+ return content, 'html', True
+ else:
+ return None
+
+ def _extract_comment(self, comment, wordpress_namespace):
+ """Extract comment from dump."""
+ id = int(get_text_tag(comment, "{{{0}}}comment_id".format(wordpress_namespace), None))
+ author = get_text_tag(comment, "{{{0}}}comment_author".format(wordpress_namespace), None)
+ author_email = get_text_tag(comment, "{{{0}}}comment_author_email".format(wordpress_namespace), None)
+ author_url = get_text_tag(comment, "{{{0}}}comment_author_url".format(wordpress_namespace), None)
+ author_IP = get_text_tag(comment, "{{{0}}}comment_author_IP".format(wordpress_namespace), None)
+ # date = get_text_tag(comment, "{{{0}}}comment_date".format(wordpress_namespace), None)
+ date_gmt = get_text_tag(comment, "{{{0}}}comment_date_gmt".format(wordpress_namespace), None)
+ content = get_text_tag(comment, "{{{0}}}comment_content".format(wordpress_namespace), None)
+ approved = get_text_tag(comment, "{{{0}}}comment_approved".format(wordpress_namespace), '0')
+ if approved == '0':
+ approved = 'hold'
+ elif approved == '1':
+ approved = 'approved'
+ elif approved == 'spam' or approved == 'trash':
+ pass
+ else:
+ LOGGER.warn("Unknown comment approved status: " + str(approved))
+ parent = int(get_text_tag(comment, "{{{0}}}comment_parent".format(wordpress_namespace), 0))
+ if parent == 0:
+ parent = None
+ user_id = int(get_text_tag(comment, "{{{0}}}comment_user_id".format(wordpress_namespace), 0))
+ if user_id == 0:
+ user_id = None
+
+ if approved == 'trash' or approved == 'spam':
+ return None
+
+ return {"id": id, "status": str(approved), "approved": approved == "approved",
+ "author": author, "email": author_email, "url": author_url, "ip": author_IP,
+ "date": date_gmt, "content": content, "parent": parent, "user_id": user_id}
+
+ def _write_comment(self, filename, comment):
+ """Write comment to file."""
+ def write_header_line(fd, header_field, header_content):
+ """Write comment header line."""
+ if header_content is None:
+ return
+ header_content = str(header_content).replace('\n', ' ')
+ line = '.. ' + header_field + ': ' + header_content + '\n'
+ fd.write(line.encode('utf8'))
+
+ with open(filename, "wb+") as fd:
+ write_header_line(fd, "id", comment["id"])
+ write_header_line(fd, "status", comment["status"])
+ write_header_line(fd, "approved", comment["approved"])
+ write_header_line(fd, "author", comment["author"])
+ write_header_line(fd, "author_email", comment["email"])
+ write_header_line(fd, "author_url", comment["url"])
+ write_header_line(fd, "author_IP", comment["ip"])
+ write_header_line(fd, "date_utc", comment["date"])
+ write_header_line(fd, "parent_id", comment["parent"])
+ write_header_line(fd, "wordpress_user_id", comment["user_id"])
+ fd.write(('\n' + comment['content']).encode('utf8'))
+
+ def _create_metadata(self, status, excerpt, tags, categories, post_name=None):
+ """Create post metadata."""
+ other_meta = {'wp-status': status}
+ if excerpt is not None:
+ other_meta['excerpt'] = excerpt
+ if self.export_categories_as_categories:
+ cats = []
+ for text in categories:
+ if text in self._category_paths:
+ cats.append(self._category_paths[text])
+ else:
+ cats.append(utils.join_hierarchical_category_path([text]))
+ other_meta['categories'] = ','.join(cats)
+ if len(cats) > 0:
+ other_meta['category'] = cats[0]
+ if len(cats) > 1:
+ LOGGER.warn(('Post "{0}" has more than one category! ' +
+ 'Will only use the first one.').format(post_name))
+ tags_cats = tags
+ else:
+ tags_cats = tags + categories
+ return tags_cats, other_meta
- def import_item(self, item, wordpress_namespace, out_folder=None):
- """Takes an item from the feed and creates a post file."""
+ def import_postpage_item(self, item, wordpress_namespace, out_folder=None, attachments=None):
+ """Take an item from the feed and creates a post file."""
if out_folder is None:
out_folder = 'posts'
@@ -439,7 +778,7 @@ class CommandImportWordpress(Command, ImportMixin):
item, '{{{0}}}post_id'.format(wordpress_namespace), None)
if not slug: # should never happen
LOGGER.error("Error converting post:", title)
- return
+ return False
else:
if len(pathlist) > 1:
out_folder = os.path.join(*([out_folder] + pathlist[:-1]))
@@ -461,23 +800,42 @@ class CommandImportWordpress(Command, ImportMixin):
item, '{{{0}}}status'.format(wordpress_namespace), 'publish')
content = get_text_tag(
item, '{http://purl.org/rss/1.0/modules/content/}encoded', '')
+ excerpt = get_text_tag(
+ item, '{http://wordpress.org/export/1.2/excerpt/}encoded', None)
+
+ if excerpt is not None:
+ if len(excerpt) == 0:
+ excerpt = None
tags = []
+ categories = []
if status == 'trash':
LOGGER.warn('Trashed post "{0}" will not be imported.'.format(title))
- return
+ return False
+ elif status == 'private':
+ tags.append('private')
+ is_draft = False
+ is_private = True
elif status != 'publish':
tags.append('draft')
is_draft = True
+ is_private = False
else:
is_draft = False
+ is_private = False
for tag in item.findall('category'):
text = tag.text
- if text == 'Uncategorized':
+ type = 'category'
+ if 'domain' in tag.attrib:
+ type = tag.attrib['domain']
+ if text == 'Uncategorized' and type == 'category':
continue
- tags.append(text)
self.all_tags.add(text)
+ if type == 'category':
+ categories.append(type)
+ else:
+ tags.append(text)
if '$latex' in content:
tags.append('mathjax')
@@ -487,11 +845,16 @@ class CommandImportWordpress(Command, ImportMixin):
format_tag = [x for x in item.findall('*//{%s}meta_key' % wordpress_namespace) if x.text == '_tc_post_format']
if format_tag:
post_format = format_tag[0].getparent().find('{%s}meta_value' % wordpress_namespace).text
+ if post_format == 'wpautop':
+ post_format = 'wp'
if is_draft and self.exclude_drafts:
LOGGER.notice('Draft "{0}" will not be imported.'.format(title))
-
- elif content.strip():
+ return False
+ elif is_private and self.exclude_privates:
+ LOGGER.notice('Private post "{0}" will not be imported.'.format(title))
+ return False
+ elif content.strip() or self.import_empty_items:
# If no content is found, no files are written.
self.url_map[link] = (self.context['SITE_URL'] +
out_folder.rstrip('/') + '/' + slug +
@@ -503,53 +866,121 @@ class CommandImportWordpress(Command, ImportMixin):
content_translations = {"": content}
default_language = self.context["DEFAULT_LANG"]
for lang, content in content_translations.items():
+ try:
+ content, extension, rewrite_html = self.transform_content(content, post_format, attachments)
+ except:
+ LOGGER.error(('Cannot interpret post "{0}" (language {1}) with post ' +
+ 'format {2}!').format(os.path.join(out_folder, slug), lang, post_format))
+ return False
if lang:
out_meta_filename = slug + '.meta'
if lang == default_language:
- out_content_filename = slug + '.wp'
+ out_content_filename = slug + '.' + extension
else:
out_content_filename \
= utils.get_translation_candidate(self.context,
- slug + ".wp", lang)
+ slug + "." + extension, lang)
self.extra_languages.add(lang)
meta_slug = slug
else:
out_meta_filename = slug + '.meta'
- out_content_filename = slug + '.wp'
+ out_content_filename = slug + '.' + extension
meta_slug = slug
- if post_format == 'wp':
- content = self.transform_content(content)
+ tags, other_meta = self._create_metadata(status, excerpt, tags, categories,
+ post_name=os.path.join(out_folder, slug))
self.write_metadata(os.path.join(self.output_folder, out_folder,
out_meta_filename),
- title, meta_slug, post_date, description, tags)
+ title, meta_slug, post_date, description, tags, **other_meta)
self.write_content(
os.path.join(self.output_folder,
out_folder, out_content_filename),
- content)
+ content,
+ rewrite_html)
+
+ if self.export_comments:
+ comments = []
+ for tag in item.findall('{{{0}}}comment'.format(wordpress_namespace)):
+ comment = self._extract_comment(tag, wordpress_namespace)
+ if comment is not None:
+ comments.append(comment)
+
+ for comment in comments:
+ comment_filename = slug + "." + str(comment['id']) + ".wpcomment"
+ self._write_comment(os.path.join(self.output_folder, out_folder, comment_filename), comment)
+
+ return (out_folder, slug)
else:
- LOGGER.warn('Not going to import "{0}" because it seems to contain'
- ' no content.'.format(title))
+ LOGGER.warn(('Not going to import "{0}" because it seems to contain'
+ ' no content.').format(title))
+ return False
- def process_item(self, item):
+ def _extract_item_info(self, item):
+ """Extract information about an item."""
# The namespace usually is something like:
# http://wordpress.org/export/1.2/
wordpress_namespace = item.nsmap['wp']
post_type = get_text_tag(
item, '{{{0}}}post_type'.format(wordpress_namespace), 'post')
+ post_id = int(get_text_tag(
+ item, '{{{0}}}post_id'.format(wordpress_namespace), "0"))
+ parent_id = get_text_tag(
+ item, '{{{0}}}post_parent'.format(wordpress_namespace), None)
+ return wordpress_namespace, post_type, post_id, parent_id
+
+ def process_item_if_attachment(self, item):
+ """Process attachments."""
+ wordpress_namespace, post_type, post_id, parent_id = self._extract_item_info(item)
if post_type == 'attachment':
- self.import_attachment(item, wordpress_namespace)
- elif post_type == 'post':
- self.import_item(item, wordpress_namespace, 'posts')
- else:
- self.import_item(item, wordpress_namespace, 'stories')
+ data = self.import_attachment(item, wordpress_namespace)
+ # If parent was found, store relation with imported files
+ if parent_id is not None and int(parent_id) != 0:
+ self.attachments[int(parent_id)][post_id] = data
+ else:
+ LOGGER.warn("Attachment #{0} ({1}) has no parent!".format(post_id, data['files']))
+
+ def write_attachments_info(self, path, attachments):
+ """Write attachments info file."""
+ with io.open(path, "wb") as file:
+ file.write(json.dumps(attachments).encode('utf-8'))
+
+ def process_item_if_post_or_page(self, item):
+ """Process posts and pages."""
+ wordpress_namespace, post_type, post_id, parent_id = self._extract_item_info(item)
+
+ if post_type != 'attachment':
+ # Get attachments for post
+ attachments = self.attachments.pop(post_id, None)
+ # Import item
+ if post_type == 'post':
+ out_folder_slug = self.import_postpage_item(item, wordpress_namespace, 'posts', attachments)
+ else:
+ out_folder_slug = self.import_postpage_item(item, wordpress_namespace, 'stories', attachments)
+ # Process attachment data
+ if attachments is not None:
+ # If post was exported, store data
+ if out_folder_slug:
+ destination = os.path.join(self.output_folder, out_folder_slug[0],
+ out_folder_slug[1] + ".attachments.json")
+ self.write_attachments_info(destination, attachments)
def import_posts(self, channel):
+ """Import posts into the site."""
+ self.attachments = defaultdict(dict)
+ # First process attachments
+ for item in channel.findall('item'):
+ self.process_item_if_attachment(item)
+ # Next process posts
for item in channel.findall('item'):
- self.process_item(item)
+ self.process_item_if_post_or_page(item)
+ # Assign attachments to posts
+ for post_id in self.attachments:
+ LOGGER.warn(("Found attachments for post or page #{0}, but didn't find post or page. " +
+ "(Attachments: {1})").format(post_id, [e['files'][0] for e in self.attachments[post_id].values()]))
def get_text_tag(tag, name, default):
+ """Get the text of an XML tag."""
if tag is None:
return default
t = tag.find(name)
@@ -560,9 +991,10 @@ def get_text_tag(tag, name, default):
def separate_qtranslate_content(text):
- """Parse the content of a wordpress post or page and separate
- the various language specific contents when they are delimited
- with qtranslate tags: <!--:LL-->blabla<!--:-->"""
+ """Parse the content of a wordpress post or page and separate qtranslate languages.
+
+ qtranslate tags: <!--:LL-->blabla<!--:-->
+ """
# TODO: uniformize qtranslate tags <!--/en--> => <!--:-->
qt_start = "<!--:"
qt_end = "-->"