summaryrefslogtreecommitdiffstats
path: root/nikola/plugins/command_import_wordpress.py
diff options
context:
space:
mode:
authorLibravatarAgustin Henze <tin@sluc.org.ar>2013-01-02 08:35:03 -0300
committerLibravatarAgustin Henze <tin@sluc.org.ar>2013-01-02 08:35:03 -0300
commit9c5708cc92af894e414bc76ee35ec2230de5d288 (patch)
tree61bd56b5517a4713626c254981143e008c719469 /nikola/plugins/command_import_wordpress.py
parent0f2c04e70a0ffdd0892d6970cafbcd952d221db5 (diff)
Imported Upstream version 5.1upstream/5.1
Diffstat (limited to 'nikola/plugins/command_import_wordpress.py')
-rw-r--r--nikola/plugins/command_import_wordpress.py330
1 files changed, 225 insertions, 105 deletions
diff --git a/nikola/plugins/command_import_wordpress.py b/nikola/plugins/command_import_wordpress.py
index e75d022..1552da4 100644
--- a/nikola/plugins/command_import_wordpress.py
+++ b/nikola/plugins/command_import_wordpress.py
@@ -1,11 +1,45 @@
+# Copyright (c) 2012 Roberto Alsina y otros.
+
+# Permission is hereby granted, free of charge, to any
+# person obtaining a copy of this software and associated
+# documentation files (the "Software"), to deal in the
+# Software without restriction, including without limitation
+# the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the
+# Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice
+# shall be included in all copies or substantial portions of
+# the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
+# KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
+# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
+# PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
+# OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+from __future__ import unicode_literals, print_function
import codecs
+import csv
import os
-from urlparse import urlparse
-from urllib import urlopen
+import re
+try:
+ from urlparse import urlparse
+except ImportError:
+ from urllib.parse import urlparse
-from lxml import etree, html
+from lxml import etree, html, builder
from mako.template import Template
+try:
+ import requests
+except ImportError:
+ requests = None
+
from nikola.plugin_categories import Command
from nikola import utils
@@ -17,38 +51,67 @@ class CommandImportWordpress(Command):
name = "import_wordpress"
- def run(self, fname=None):
- # Parse the data
- if fname is None:
- print "Usage: nikola import_wordpress wordpress_dump.xml"
- return
- context = {}
- with open(fname) as fd:
- xml = []
+ @staticmethod
+ def read_xml_file(filename):
+ xml = []
+
+ with open(filename, 'rb') as fd:
for line in fd:
# These explode etree and are useless
- if '<atom:link rel=' in line:
+ if b'<atom:link rel=' in line:
continue
xml.append(line)
- xml = '\n'.join(xml)
+ xml = b'\n'.join(xml)
- tree = etree.fromstring(xml)
+ return xml
+
+ @classmethod
+ def get_channel_from_file(cls, filename):
+ tree = etree.fromstring(cls.read_xml_file(filename))
channel = tree.find('channel')
+ return channel
+
+ @staticmethod
+ def configure_redirections(url_map):
+ redirections = []
+ for k, v in url_map.items():
+ # remove the initial "/" because src is a relative file path
+ src = (urlparse(k).path + 'index.html')[1:]
+ dst = (urlparse(v).path)
+ if src == 'index.html':
+ print("Can't do a redirect for: %r" % k)
+ else:
+ redirections.append((src, dst))
+
+ return redirections
+ @staticmethod
+ def generate_base_site(context):
+ os.system('nikola init new_site')
+ conf_template = Template(filename=os.path.join(
+ os.path.dirname(utils.__file__), 'conf.py.in'))
+
+ return conf_template
+
+ @staticmethod
+ def populate_context(channel):
+ wordpress_namespace = channel.nsmap['wp']
+
+ context = {}
context['DEFAULT_LANG'] = get_text_tag(channel, 'language', 'en')[:2]
- context['BLOG_TITLE'] = get_text_tag(
- channel, 'title', 'PUT TITLE HERE')
+ context['BLOG_TITLE'] = get_text_tag(channel, 'title',
+ 'PUT TITLE HERE')
context['BLOG_DESCRIPTION'] = get_text_tag(
channel, 'description', 'PUT DESCRIPTION HERE')
context['BLOG_URL'] = get_text_tag(channel, 'link', '#')
- author = channel.find('{http://wordpress.org/export/1.2/}author')
+ author = channel.find('{%s}author' % wordpress_namespace)
context['BLOG_EMAIL'] = get_text_tag(
author,
- '{http://wordpress.org/export/1.2/}author_email',
+ '{%s}author_email' % wordpress_namespace,
"joe@example.com")
context['BLOG_AUTHOR'] = get_text_tag(
author,
- '{http://wordpress.org/export/1.2/}author_display_name',
+ '{%s}author_display_name' % wordpress_namespace,
"Joe Example")
context['POST_PAGES'] = '''(
("posts/*.wp", "posts", "post.tmpl", True),
@@ -61,19 +124,149 @@ class CommandImportWordpress(Command):
}
'''
- # Generate base site
- os.system('nikola init new_site')
- conf_template = Template(filename=os.path.join(
- os.path.dirname(utils.__file__), 'data', 'samplesite', 'conf.py.in'))
- with codecs.open(os.path.join('new_site', 'conf.py'),
- 'w+', 'utf8') as fd:
- fd.write(conf_template.render(**context))
+ return context
- # Import posts
- for item in channel.findall('item'):
- import_attachment(item)
+ @staticmethod
+ def download_url_content_to_file(url, dst_path):
+ with open(dst_path, 'wb+') as fd:
+ fd.write(requests.get(url).content)
+
+ def import_attachment(self, item, wordpress_namespace):
+ url = get_text_tag(item, '{%s}attachment_url' % wordpress_namespace, 'foo')
+ link = get_text_tag(item, '{%s}link' % wordpress_namespace, 'foo')
+ path = urlparse(url).path
+ dst_path = os.path.join(*(['new_site', 'files']
+ + list(path.split('/'))))
+ dst_dir = os.path.dirname(dst_path)
+ if not os.path.isdir(dst_dir):
+ os.makedirs(dst_dir)
+ print("Downloading %s => %s" % (url, dst_path))
+ self.download_url_content_to_file(url, dst_path)
+ dst_url = '/'.join(dst_path.split(os.sep)[2:])
+ links[link] = '/' + dst_url
+ links[url] = '/' + dst_url
+
+ @staticmethod
+ def write_content(filename, content):
+ with open(filename, "wb+") as fd:
+ if content.strip():
+ # Handle sourcecode pseudo-tags
+ content = re.sub('\[sourcecode language="([^"]+)"\]',
+ "\n~~~~~~~~~~~~{.\\1}\n", content)
+ content = content.replace('[/sourcecode]', "\n~~~~~~~~~~~~\n")
+ doc = html.document_fromstring(content)
+ doc.rewrite_links(replacer)
+ # Replace H1 elements with H2 elements
+ for tag in doc.findall('.//h1'):
+ if not tag.text:
+ print("Failed to fix bad title: %r" %
+ html.tostring(tag))
+ else:
+ tag.getparent().replace(tag, builder.E.h2(tag.text))
+ fd.write(html.tostring(doc, encoding='utf8'))
+
+ @staticmethod
+ def write_metadata(filename, title, slug, post_date, description, tags):
+ with codecs.open(filename, "w+", "utf8") as fd:
+ fd.write('%s\n' % title)
+ fd.write('%s\n' % slug)
+ fd.write('%s\n' % post_date)
+ fd.write('%s\n' % ','.join(tags))
+ fd.write('\n')
+ fd.write('%s\n' % description)
+
+ def import_item(self, item, wordpress_namespace, out_folder=None):
+ """Takes an item from the feed and creates a post file."""
+ if out_folder is None:
+ out_folder = 'posts'
+
+ title = get_text_tag(item, 'title', 'NO TITLE')
+ # link is something like http://foo.com/2012/09/01/hello-world/
+ # So, take the path, utils.slugify it, and that's our slug
+ link = get_text_tag(item, 'link', None)
+ slug = utils.slugify(urlparse(link).path)
+ if not slug: # it happens if the post has no "nice" URL
+ slug = get_text_tag(item, '{%s}post_name' % wordpress_namespace, None)
+ if not slug: # it *may* happen
+ slug = get_text_tag(item, '{%s}post_id' % wordpress_namespace, None)
+ if not slug: # should never happen
+ print("Error converting post:", title)
+ return
+
+ description = get_text_tag(item, 'description', '')
+ post_date = get_text_tag(item, '{%s}post_date' % wordpress_namespace, None)
+ status = get_text_tag(item, '{%s}status' % wordpress_namespace, 'publish')
+ content = get_text_tag(
+ item, '{http://purl.org/rss/1.0/modules/content/}encoded', '')
+
+ tags = []
+ if status != 'publish':
+ tags.append('draft')
+ for tag in item.findall('category'):
+ text = tag.text
+ if text == 'Uncategorized':
+ continue
+ tags.append(text)
+
+ self.url_map[link] = self.context['BLOG_URL'] + '/' + \
+ out_folder + '/' + slug + '.html'
+
+ self.write_metadata(os.path.join('new_site', out_folder,
+ slug + '.meta'),
+ title, slug, post_date, description, tags)
+ self.write_content(
+ os.path.join('new_site', out_folder, slug + '.wp'), content)
+
+ def process_item(self, item):
+ # The namespace usually is something like:
+ # http://wordpress.org/export/1.2/
+ wordpress_namespace = item.nsmap['wp']
+ post_type = get_text_tag(item, '{%s}post_type' % wordpress_namespace, 'post')
+
+ if post_type == 'attachment':
+ self.import_attachment(item, wordpress_namespace)
+ elif post_type == 'post':
+ self.import_item(item, wordpress_namespace, 'posts')
+ else:
+ self.import_item(item, wordpress_namespace, 'stories')
+
+ def import_posts(self, channel):
for item in channel.findall('item'):
- import_item(item)
+ self.process_item(item)
+
+ @staticmethod
+ def write_urlmap_csv(output_file, url_map):
+ with codecs.open(output_file, 'w+', 'utf8') as fd:
+ csv_writer = csv.writer(fd)
+ for item in url_map.items():
+ csv_writer.writerow(item)
+
+ @staticmethod
+ def write_configuration(filename, rendered_template):
+ with codecs.open(filename, 'w+', 'utf8') as fd:
+ fd.write(rendered_template)
+
+ def run(self, fname=None):
+ # Parse the data
+ if requests is None:
+ print('To use the import_wordpress command, you have to install the "requests" package.')
+ return
+ if fname is None:
+ print("Usage: nikola import_wordpress wordpress_dump.xml")
+ return
+
+ self.url_map = {}
+ channel = self.get_channel_from_file(fname)
+ self.context = self.populate_context(channel)
+ conf_template = self.generate_base_site(self.context)
+ self.context['REDIRECTIONS'] = self.configure_redirections(
+ self.url_map)
+
+ self.import_posts(channel)
+ self.write_urlmap_csv(
+ os.path.join('new_site', 'url_map.csv'), self.url_map)
+ self.write_configuration(os.path.join(
+ 'new_site', 'conf.py'), conf_template.render(**self.context))
def replacer(dst):
@@ -81,83 +274,10 @@ def replacer(dst):
def get_text_tag(tag, name, default):
+ if tag is None:
+ return default
t = tag.find(name)
if t is not None:
return t.text
else:
return default
-
-
-def import_attachment(item):
- post_type = get_text_tag(item,
- '{http://wordpress.org/export/1.2/}post_type', 'post')
- if post_type == 'attachment':
- url = get_text_tag(item,
- '{http://wordpress.org/export/1.2/}attachment_url', 'foo')
- link = get_text_tag(item,
- '{http://wordpress.org/export/1.2/}link', 'foo')
- path = urlparse(url).path
- dst_path = os.path.join(*(['new_site', 'files']
- + list(path.split('/'))))
- dst_dir = os.path.dirname(dst_path)
- if not os.path.isdir(dst_dir):
- os.makedirs(dst_dir)
- print "Downloading %s => %s" % (url, dst_path)
- with open(dst_path, 'wb+') as fd:
- fd.write(urlopen(url).read())
- dst_url = '/'.join(dst_path.split(os.sep)[2:])
- links[link] = '/' + dst_url
- links[url] = '/' + dst_url
- return
-
-
-def import_item(item):
- """Takes an item from the feed and creates a post file."""
- title = get_text_tag(item, 'title', 'NO TITLE')
- # link is something like http://foo.com/2012/09/01/hello-world/
- # So, take the path, utils.slugify it, and that's our slug
- slug = utils.slugify(urlparse(get_text_tag(item, 'link', None)).path)
- description = get_text_tag(item, 'description', '')
- post_date = get_text_tag(item,
- '{http://wordpress.org/export/1.2/}post_date', None)
- post_type = get_text_tag(item,
- '{http://wordpress.org/export/1.2/}post_type', 'post')
- status = get_text_tag(item,
- '{http://wordpress.org/export/1.2/}status', 'publish')
- content = get_text_tag(item,
- '{http://purl.org/rss/1.0/modules/content/}encoded', '')
-
- tags = []
- if status != 'publish':
- tags.append('draft')
- for tag in item.findall('category'):
- text = tag.text
- if text == 'Uncategorized':
- continue
- tags.append(text)
-
- if post_type == 'attachment':
- return
- elif post_type == 'post':
- out_folder = 'posts'
- else:
- out_folder = 'stories'
- # Write metadata
- with codecs.open(os.path.join('new_site', out_folder, slug + '.meta'),
- "w+", "utf8") as fd:
- fd.write(u'%s\n' % title)
- fd.write(u'%s\n' % slug)
- fd.write(u'%s\n' % post_date)
- fd.write(u'%s\n' % ','.join(tags))
- fd.write(u'\n')
- fd.write(u'%s\n' % description)
- with open(os.path.join(
- 'new_site', out_folder, slug + '.wp'), "wb+") as fd:
- if content.strip():
- try:
- doc = html.document_fromstring(content)
- doc.rewrite_links(replacer)
- fd.write(html.tostring(doc, encoding='utf8'))
- except:
- import pdb
- pdb.set_trace()