Skip to content

Instantly share code, notes, and snippets.

@corentinbettiol
Last active January 27, 2026 13:24
Show Gist options
  • Select an option

  • Save corentinbettiol/726cb65148a54b852dbfe04f59e5ca45 to your computer and use it in GitHub Desktop.

Select an option

Save corentinbettiol/726cb65148a54b852dbfe04f59e5ca45 to your computer and use it in GitHub Desktop.
Migrate data from an old instance of aldryn-newsblog to djangocms 4+ djangocms-stories.

Blog Content Migration Scripts

These scripts can export the content of aldryn_newsblog posts in JSON format, then import this content into djangocms-stories posts.

How it works

  1. Serialize plugins content from blog posts placeholder (export).
  2. Write serialized content in a JSON file (export).
  3. Read serialized content, link the old plugins to the new ones, and try to create them (import).
  4. Save those plugins in the placeholder of stories (import).
  5. Create a draft version of each blog article and publish it automatically (import).

Note: You'll still need to create the blog posts yourself, as those scripts expect to find empty blog posts on the new site.

How to use it

  1. Put export_blog_content.py in a management/commands/ folder on your old website.
  2. Put import_blog_content.py in a management/commands/ folder on your new website.
  3. Launch python manage.py export_blog_content <file_name>.json from your old website.
  4. Put the generated <file_name>.json at the root level of your new website.
  5. Launch python manage.py import_blog_content <file_name>.json from your new website.
  6. If everything goes well, you should have your text, links & images ready in your new website.

Import options

The import_blog_content command supports several useful options:

  • --dry-run: Run in simulation mode without making any modifications. Useful for testing.
  • --clear-existing: Delete existing content in placeholders before importing new content.
  • --site-url <URL>: Specify the public site URL (old site) to download images from. Defaults to https://www.example.com.

Example:

python manage.py import_blog_content blog_content.json --dry-run --clear-existing --site-url https://old-site.com

Features

  • Automatic image handling: The import script can automatically download images from the old site and import them into filer. Images are searched by filename first, then downloaded if not found locally.
  • Plugin ID mapping: The script automatically updates plugin IDs in HTML content (TextPlugin body) to match the new plugin IDs created during import.
  • Version management: Automatically creates draft versions and publishes them after import when using djangocms-versioning.
  • Plugin type conversion: Automatically converts old plugin types (Bootstrap4PicturePlugin, Bootstrap4LinkPlugin, etc.) to their modern equivalents (ImagePlugin, LinkPlugin).

Misc

  • You can find an example of generated JSON in this gist, it's the example_export.json file.
  • A lot of this code was written by Claude, so don't expect easy-to-read functions and workflow.
[
{
"slug": "lorem-ipsum-dolor-sit-amet",
"plugins": [
{
"plugin_type": "TextPlugin",
"position": 0,
"body": "<p>Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.</p>\n\n<p><cms-plugin alt=\"Picture / Image - 42097 \" title=\"Picture / Image - 42097\" id=\"42097\"></cms-plugin></p>\n\n<p>Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.</p>\n\n<p>Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. <cms-plugin alt=\"Link / Button - 42098 \" title=\"Link / Button - 42098\" id=\"42098\"></cms-plugin> et <cms-plugin alt=\"Link / Button - 42099 \" title=\"Link / Button - 42099\" id=\"42099\"></cms-plugin>.</p>",
"children": [
{
"plugin_type": "Bootstrap4PicturePlugin",
"position": 0,
"image": {
"file_path": "filer_public/placeholder/placeholder_1119__800x600_q85.jpg",
"original_filename": "placeholder_1119.jpg",
"filer_id": 1119
},
"attributes": {
"alignment": "",
"alphabet": "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"attributes_str": "",
"caption_text": "",
"child_plugin_instances": null,
"cmsplugin_ptr_id": 42097,
"depth": 2,
"external_picture": null,
"gap": 1,
"height": null,
"id": 42097,
"img_src": "/media/filer_public_thumbnails/filer_public/placeholder/placeholder_1119__800x600_q85.jpg",
"img_srcset_data": null,
"is_responsive_image": false,
"language": "fr",
"link_attributes_str": "",
"link_page": null,
"link_page_id": null,
"link_target": "",
"link_url": null,
"numchild": 0,
"numconv_obj_": null,
"parent_id": 42096,
"path": "02860001",
"picture_fluid": true,
"picture_id": 1119,
"picture_rounded": false,
"picture_thumbnail": false,
"pk": 42097,
"placeholder_id": 2053,
"plugin_type": "Bootstrap4PicturePlugin",
"position": 0,
"steplen": 4,
"template": "default",
"thumbnail_options": null,
"thumbnail_options_id": null,
"use_automatic_scaling": true,
"use_crop": false,
"use_no_cropping": false,
"use_responsive_image": "inherit",
"use_upscale": false,
"width": null
}
},
{
"plugin_type": "Bootstrap4LinkPlugin",
"position": 1,
"name": "About",
"url": "/about/",
"attributes": {
"alphabet": "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"anchor": "",
"attributes_str": "",
"child_plugin_instances": null,
"cmsplugin_ptr_id": 42098,
"depth": 2,
"external_link": "/about/",
"file_link": null,
"file_link_id": null,
"gap": 1,
"icon_left": "",
"icon_right": "",
"id": 42098,
"internal_link": null,
"internal_link_id": null,
"language": "fr",
"link_block": false,
"link_context": "",
"link_is_optional": false,
"link_outline": false,
"link_size": "",
"link_type": "link",
"mailto": "",
"name": "About",
"numchild": 0,
"numconv_obj_": null,
"parent_id": 42096,
"path": "02860002",
"phone": "",
"pk": 42098,
"placeholder_id": 2053,
"plugin_type": "Bootstrap4LinkPlugin",
"position": 1,
"steplen": 4,
"target": "",
"template": "default"
},
"target": ""
},
{
"plugin_type": "Bootstrap4LinkPlugin",
"position": 2,
"name": "Contact",
"url": "/contact/",
"attributes": {
"alphabet": "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"anchor": "",
"attributes_str": "",
"child_plugin_instances": null,
"cmsplugin_ptr_id": 42099,
"depth": 2,
"external_link": "/contact/",
"file_link": null,
"file_link_id": null,
"gap": 1,
"icon_left": "",
"icon_right": "",
"id": 42099,
"internal_link": null,
"internal_link_id": null,
"language": "fr",
"link_block": false,
"link_context": "",
"link_is_optional": false,
"link_outline": false,
"link_size": "",
"link_type": "link",
"mailto": "",
"name": "Contact",
"numchild": 0,
"numconv_obj_": null,
"parent_id": 42096,
"path": "02860003",
"phone": "",
"pk": 42099,
"placeholder_id": 2053,
"plugin_type": "Bootstrap4LinkPlugin",
"position": 2,
"steplen": 4,
"target": "",
"template": "default"
},
"target": ""
}
]
}
]
},
{
"slug": "consectetur-adipiscing-elit",
"plugins": [
{
"plugin_type": "TextPlugin",
"position": 0,
"body": "<p>Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris.</p>\n\n<p><cms-plugin alt=\"Picture / Image - 42093 \" title=\"Picture / Image - 42093\" id=\"42093\"></cms-plugin></p>\n\n<p>Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.</p>\n\n<p><cms-plugin alt=\"Picture / Image - 42094 \" title=\"Picture / Image - 42094\" id=\"42094\"></cms-plugin></p>\n\n<p>Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium. <cms-plugin alt=\"Link / Button - 42096 \" title=\"Link / Button - 42096\" id=\"42096\"></cms-plugin>.</p>",
"children": [
{
"plugin_type": "Bootstrap4PicturePlugin",
"position": 0,
"image": {
"file_path": "filer_public/placeholder/placeholder_1115__800x600_q85.jpg",
"original_filename": "placeholder_1115.jpg",
"filer_id": 1115
},
"attributes": {
"alignment": "",
"alphabet": "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"attributes_str": "",
"caption_text": "",
"child_plugin_instances": null,
"cmsplugin_ptr_id": 42093,
"depth": 2,
"external_picture": null,
"gap": 1,
"height": null,
"id": 42093,
"img_src": "/media/filer_public_thumbnails/filer_public/placeholder/placeholder_1115__800x600_q85.jpg",
"img_srcset_data": null,
"is_responsive_image": false,
"language": "fr",
"link_attributes_str": "",
"link_page": null,
"link_page_id": null,
"link_target": "",
"link_url": null,
"numchild": 0,
"numconv_obj_": null,
"parent_id": 42092,
"path": "02850001",
"picture_fluid": true,
"picture_id": 1115,
"picture_rounded": false,
"picture_thumbnail": false,
"pk": 42093,
"placeholder_id": 2052,
"plugin_type": "Bootstrap4PicturePlugin",
"position": 0,
"steplen": 4,
"template": "default",
"thumbnail_options": null,
"thumbnail_options_id": null,
"use_automatic_scaling": true,
"use_crop": false,
"use_no_cropping": false,
"use_responsive_image": "inherit",
"use_upscale": false,
"width": null
}
},
{
"plugin_type": "Bootstrap4PicturePlugin",
"position": 1,
"image": {
"file_path": "filer_public/placeholder/placeholder_1116__800x600_q85.jpg",
"original_filename": "placeholder_1116.jpg",
"filer_id": 1116
},
"attributes": {
"alignment": "",
"alphabet": "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"attributes_str": "",
"caption_text": "",
"child_plugin_instances": null,
"cmsplugin_ptr_id": 42094,
"depth": 2,
"external_picture": null,
"gap": 1,
"height": null,
"id": 42094,
"img_src": "/media/filer_public_thumbnails/filer_public/placeholder/placeholder_1116__800x600_q85.jpg",
"img_srcset_data": null,
"is_responsive_image": false,
"language": "fr",
"link_attributes_str": "",
"link_page": null,
"link_page_id": null,
"link_target": "",
"link_url": null,
"numchild": 0,
"numconv_obj_": null,
"parent_id": 42092,
"path": "02850002",
"picture_fluid": true,
"picture_id": 1116,
"picture_rounded": false,
"picture_thumbnail": false,
"pk": 42094,
"placeholder_id": 2052,
"plugin_type": "Bootstrap4PicturePlugin",
"position": 1,
"steplen": 4,
"template": "default",
"thumbnail_options": null,
"thumbnail_options_id": null,
"use_automatic_scaling": true,
"use_crop": false,
"use_no_cropping": false,
"use_responsive_image": "inherit",
"use_upscale": false,
"width": null
}
},
{
"plugin_type": "Bootstrap4LinkPlugin",
"position": 2,
"name": "About",
"url": "/about/",
"attributes": {
"alphabet": "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"anchor": "",
"attributes_str": "",
"child_plugin_instances": null,
"cmsplugin_ptr_id": 42096,
"depth": 2,
"external_link": "/about/",
"file_link": null,
"file_link_id": null,
"gap": 1,
"icon_left": "",
"icon_right": "",
"id": 42096,
"internal_link": null,
"internal_link_id": null,
"language": "fr",
"link_block": false,
"link_context": "",
"link_is_optional": false,
"link_outline": false,
"link_size": "",
"link_type": "link",
"mailto": "",
"name": "About",
"numchild": 0,
"numconv_obj_": null,
"parent_id": 42092,
"path": "02850003",
"phone": "",
"pk": 42096,
"placeholder_id": 2052,
"plugin_type": "Bootstrap4LinkPlugin",
"position": 2,
"steplen": 4,
"target": "",
"template": "default"
},
"target": ""
}
]
}
]
}
]
#!/usr/bin/env python
"""
Script to export the content of the "content" placeholder from all blog articles
for Django CMS 3.11 (aldryn-newsblog)
Usage:
python manage.py export_blog_content output.json
"""
import json
from django.core.management.base import BaseCommand
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
try:
from aldryn_newsblog.models import Article
ALDRYN_NEWSBLOG_AVAILABLE = True
except ImportError:
ALDRYN_NEWSBLOG_AVAILABLE = False
from cms.models import Placeholder, CMSPlugin, Page
class Command(BaseCommand):
help = "Export the content of the 'content' placeholder from all blog articles (aldryn-newsblog)"
def add_arguments(self, parser):
parser.add_argument(
"output_file",
type=str,
help="Path to the output JSON file"
)
def handle(self, *args, **options):
if not ALDRYN_NEWSBLOG_AVAILABLE:
self.stdout.write(
self.style.ERROR("aldryn-newsblog is not available")
)
return
output_file = options["output_file"]
# Get all articles
articles = Article.objects.all()
exported_data = []
for article in articles:
# Get the "content" placeholder
# In aldryn-newsblog, the placeholder is usually accessible via
# article.content or article.placeholders
placeholder = None
# Try first via the content attribute (if it's a PlaceholderField)
if hasattr(article, "content"):
placeholder = article.content
# Otherwise, try via the placeholders relation
if not placeholder:
placeholder = article.placeholders.filter(slot="content").first()
# If still not found, try to search all placeholders
if not placeholder:
placeholders = article.placeholders.all()
if placeholders.exists():
# Take the first placeholder (usually "content")
placeholder = placeholders.first()
if not placeholder:
self.stdout.write(
self.style.WARNING(
f"Article '{article.pk}' does not have a 'content' placeholder"
)
)
continue
# Get the slug
# aldryn-newsblog may use parler for translations
slug = None
# Try to get the slug in the default language
if hasattr(article, "get_slug"):
slug = article.get_slug(language="en")
# Try slug_en (if parler is used)
if not slug:
slug = getattr(article, "slug_en", None)
# Fallback to the default slug
if not slug:
slug = getattr(article, "slug", None) or str(article.pk)
# Export the placeholder plugins
plugins_data = self.export_plugins(placeholder, language="en")
exported_data.append({
"slug": slug,
"plugins": plugins_data
})
self.stdout.write(
self.style.SUCCESS(
f"Exported content from article '{slug}' ({len(plugins_data)} plugins)"
)
)
# Save to JSON file
with open(output_file, "w", encoding="utf-8") as f:
json.dump(exported_data, f, ensure_ascii=False, indent=2)
self.stdout.write(
self.style.SUCCESS(
f"\nExport completed: {len(exported_data)} articles exported to {output_file}"
)
)
def export_plugins(self, placeholder, language="en"):
"""
Recursively export all plugins from a placeholder
"""
# Get root plugins (without parent)
root_plugins = CMSPlugin.objects.filter(
placeholder=placeholder,
language=language,
parent=None
).order_by("position")
plugins_data = []
for plugin in root_plugins:
plugin_data = self.export_plugin_recursive(plugin)
if plugin_data:
plugins_data.append(plugin_data)
return plugins_data
def export_plugin_recursive(self, plugin):
"""
Export a plugin and its children recursively
"""
# Try to get the plugin instance
# Some plugins may have problematic relations
instance, plugin_class = plugin.get_plugin_instance()
if not instance:
return None
plugin_data = {
"plugin_type": plugin.plugin_type,
"position": plugin.position,
}
# Export data according to plugin type
if plugin.plugin_type == "TextPlugin":
# Text plugin (djangocms-text-ckeditor)
plugin_data["body"] = instance.body if hasattr(instance, "body") else ""
elif plugin.plugin_type in ["LinkPlugin", "Bootstrap4Link"]:
# Link plugin (standard or Bootstrap4) - export full URL (even for internal links)
plugin_data["name"] = instance.name if hasattr(instance, "name") else ""
# Get the full URL
full_url = None
# Check if it's an internal link (AbstractLink uses 'internal_link')
if hasattr(instance, "internal_link") and instance.internal_link:
# Internal link: get the full URL of the page directly
try:
full_url = instance.internal_link.get_absolute_url()
except Exception:
pass
# If no internal link, use external link
if not full_url:
if hasattr(instance, "external_link") and instance.external_link:
full_url = instance.external_link
elif hasattr(instance, "url") and instance.url:
full_url = instance.url
# Export the full URL
plugin_data["url"] = full_url or ""
plugin_data["external"] = (
instance.external if hasattr(instance, "external") else False
)
plugin_data["target"] = (
instance.target if hasattr(instance, "target") else ""
)
elif plugin.plugin_type in ["PicturePlugin", "FilerImagePlugin", "Bootstrap4Picture"]:
# Image plugin (djangocms_picture, filer or Bootstrap4)
image = None
# Try to get the image according to plugin type
if hasattr(instance, "picture") and instance.picture:
image = instance.picture
elif hasattr(instance, "image") and instance.image:
image = instance.image
if image:
# Export filer image information
plugin_data["image"] = {
"file_path": image.file.name if hasattr(image, "file") else None,
"original_filename": (
image.original_filename
if hasattr(image, "original_filename")
else None
),
"filer_id": image.pk,
}
plugin_data["alt"] = instance.alt if hasattr(instance, "alt") else ""
plugin_data["caption"] = (
instance.caption if hasattr(instance, "caption") else ""
)
elif plugin.plugin_type in ["Bootstrap4PicturePlugin"]:
# Bootstrap4 image plugin - export image information
if hasattr(instance, "picture") and instance.picture:
image = instance.picture
plugin_data["image"] = {
"file_path": image.file.name if hasattr(image, "file") else None,
"original_filename": (
image.original_filename
if hasattr(image, "original_filename")
else None
),
"filer_id": image.pk,
}
# Also export other attributes
plugin_data["attributes"] = {}
for attr in dir(instance):
if attr.startswith("_") or attr.endswith("_plugin") or attr.endswith("_plugins") or attr == "page":
continue
try:
if callable(getattr(instance, attr)):
continue
value = getattr(instance, attr)
# Do not export complex objects
if isinstance(value, (str, int, float, bool, type(None))):
plugin_data["attributes"][attr] = value
except Exception:
# Ignore attributes that cause errors (undefined relations, etc.)
pass
elif plugin.plugin_type in ["Bootstrap4LinkPlugin"]:
# Bootstrap4 link plugin - export full URL (even for internal links)
plugin_data["name"] = instance.name if hasattr(instance, "name") else ""
# Get the full URL
full_url = None
# Check if it's an internal link (internal_link is a ForeignKey to Page)
if hasattr(instance, "internal_link") and instance.internal_link:
# Internal link: get the full URL of the page directly
try:
full_url = instance.internal_link.get_absolute_url()
except Exception:
pass
# If no internal link, use external link
if not full_url:
if hasattr(instance, "external_link") and instance.external_link:
full_url = instance.external_link
elif hasattr(instance, "url") and instance.url:
full_url = instance.url
# Export the full URL
plugin_data["url"] = full_url or ""
# Also export other attributes for reference
plugin_data["attributes"] = {}
for attr in dir(instance):
if attr.startswith("_") or attr.endswith("_plugin") or attr.endswith("_plugins") or attr == "page":
continue
try:
if callable(getattr(instance, attr)):
continue
value = getattr(instance, attr)
# Do not export complex objects
if isinstance(value, (str, int, float, bool, type(None))):
plugin_data["attributes"][attr] = value
except Exception:
# Ignore attributes that cause errors (undefined relations, etc.)
pass
# Also export target
plugin_data["target"] = instance.target if hasattr(instance, "target") else ""
else:
# For other plugin types, export basic attributes
plugin_data["attributes"] = {}
for attr in dir(instance):
if not attr.startswith("_") and attr not in [
"page",
"aldryn_people_peopleplugin",
"bootstrap4_card_bootstrap4card",
"bootstrap4_card_bootstrap4cardinner",
"bootstrap4_carousel_bootstrap4carousel",
"bootstrap4_carousel_bootstrap4carouselslide",
"bootstrap4_grid_bootstrap4gridcolumn",
"bootstrap4_grid_bootstrap4gridcontainer",
"bootstrap4_grid_bootstrap4gridrow",
"bootstrap4_link_bootstrap4link",
"cms_aliasplugin",
"cms_placeholderreference",
"cmsplugin_slideshow_slideshow",
"cmsplugin_slideshow_slideshowslide",
"djangocms_call_to_action_ctapluginsettings",
"djangocms_file_file",
"djangocms_file_folder",
"djangocms_googlemap_googlemap",
"djangocms_googlemap_googlemapmarker",
"djangocms_googlemap_googlemaproute",
"djangocms_htmlsitemap_htmlsitemappluginconf",
"djangocms_icon_icon",
"djangocms_link_link",
"djangocms_modules_moduleplugin",
"djangocms_picture_picture",
"djangocms_snippet_snippetptr",
"djangocms_style_style",
"djangocms_text_ckeditor_text",
"djangocms_video_videoplayer",
"djangocms_video_videosource",
"djangocms_video_videotrack",
"fobi_contrib_apps_djangocms_integration_fobiformwidget",
"bootstrap4_picture_bootstrap4picture",
]:
# Ignore relations that may cause problems
if attr.endswith("_plugin") or attr.endswith("_plugins"):
continue
try:
if callable(getattr(instance, attr)):
continue
value = getattr(instance, attr)
# Do not export complex objects
if isinstance(value, (str, int, float, bool, type(None))):
plugin_data["attributes"][attr] = value
except Exception:
# Ignore attributes that cause errors (undefined relations, etc.)
pass
# Export child plugins
child_plugins = CMSPlugin.objects.filter(
parent=plugin
).order_by("position")
if child_plugins.exists():
plugin_data["children"] = []
for child in child_plugins:
child_data = self.export_plugin_recursive(child)
if child_data:
plugin_data["children"].append(child_data)
return plugin_data
#!/usr/bin/env python
"""
Script to import content from the "content" placeholder into blog posts
for Django CMS 5 (djangocms-stories)
Usage:
python manage.py import_blog_content input.json
"""
# Standard Library
import json
import re
# Django
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand
# Third party
import requests
try:
# Third party
from djangocms_stories.models import PostContent
DJANGOCMS_STORIES_AVAILABLE = True
except ImportError:
DJANGOCMS_STORIES_AVAILABLE = False
# Django
from django.contrib.contenttypes.models import ContentType
# Third party
from cms.api import add_plugin
from cms.models import CMSPlugin, Placeholder
try:
# Third party
from djangocms_versioning.constants import DRAFT, PUBLISHED
from djangocms_versioning.models import Version
VERSIONING_AVAILABLE = True
except ImportError:
VERSIONING_AVAILABLE = False
DRAFT = None
PUBLISHED = None
# Third party
from cms.plugin_pool import plugin_pool
try:
# Third party
from filer.models import Folder
from filer.models import Image as FilerImage
FILER_AVAILABLE = True
except ImportError:
FILER_AVAILABLE = False
class Command(BaseCommand):
help = "Import content from the 'content' placeholder into blog posts"
def add_arguments(self, parser):
parser.add_argument(
"input_file", type=str, help="Path to the input JSON file"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Dry run mode (does not make any modifications)",
)
parser.add_argument(
"--clear-existing",
action="store_true",
help="Delete existing content before import",
)
parser.add_argument(
"--site-url",
type=str,
default="https://www.example.com",
help="Public site URL (old site) to download images from",
)
def handle(self, *args, **options):
if not DJANGOCMS_STORIES_AVAILABLE:
self.stdout.write(
self.style.ERROR("djangocms-stories is not available")
)
return
input_file = options["input_file"]
dry_run = options["dry_run"]
clear_existing = options["clear_existing"]
self.verbosity = options.get("verbosity", 1)
self.site_url = options.get("site_url", "https://www.example.com")
# Create or retrieve the "blog" folder in filer
self.blog_folder = None
if FILER_AVAILABLE and not dry_run:
self.blog_folder, created = Folder.objects.get_or_create(name="blog")
if created:
self.stdout.write(self.style.SUCCESS("Folder 'blog' created in filer"))
# Load JSON
with open(input_file, encoding="utf-8") as f:
blog_posts = json.load(f)
imported_count = 0
not_found_count = 0
error_count = 0
for post_data in blog_posts:
slug = post_data.get("slug")
if not slug:
continue
try:
# Find post by slug
post_content = PostContent.objects.filter(
slug=slug, language="fr"
).first()
if not post_content:
if self.verbosity >= 2:
self.stdout.write(
self.style.WARNING(f"Post with slug '{slug}' not found")
)
not_found_count += 1
continue
# Check that use_placeholder is enabled
if not post_content.post.app_config.use_placeholder:
if self.verbosity >= 2:
self.stdout.write(
self.style.WARNING(
f"use_placeholder is not enabled for '{slug}', ignored"
)
)
not_found_count += 1
continue
# Handle djangocms-versioning BEFORE getting the placeholder
# Because each version has its own placeholder
draft_version = None
versioned_content = (
post_content # By default, use the original content
)
if VERSIONING_AVAILABLE:
content_type = ContentType.objects.get_for_model(PostContent)
published_version = Version.objects.filter(
content_type=content_type,
object_id=post_content.pk,
state=PUBLISHED,
).first()
draft_version = Version.objects.filter(
content_type=content_type,
object_id=post_content.pk,
state=DRAFT,
).first()
if not draft_version and published_version:
# Create a draft version by copying the published version
# Django
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.first()
if user:
try:
draft_version = published_version.copy(created_by=user)
if self.verbosity >= 2:
self.stdout.write(
self.style.SUCCESS(
f"Draft version created for '{slug}' (number={draft_version.number})"
)
)
# IMPORTANT: After copying, get the content from the draft version
# to ensure we're working with the correct placeholder
versioned_content = draft_version.content
# Delete copied plugins from the draft version if clear_existing
if clear_existing and not dry_run:
draft_placeholder = getattr(
versioned_content, "content", None
)
if not draft_placeholder:
draft_placeholder = (
versioned_content.placeholders.filter(
slot="content"
).first()
)
if draft_placeholder:
deleted_count = CMSPlugin.objects.filter(
placeholder=draft_placeholder, language="fr"
).delete()[0]
if self.verbosity >= 2:
self.stdout.write(
f"Copied plugins deleted from draft: {deleted_count}"
)
except Exception as e:
self.stdout.write(
self.style.WARNING(
f"Unable to create draft version for '{slug}': {e}"
)
)
# Use the draft version content if it exists
if draft_version:
# Get content from the draft version
versioned_content = draft_version.content
if self.verbosity >= 2:
self.stdout.write(
f"Using draft version content (ID: {versioned_content.pk})"
)
# Get the "content" placeholder from the versioned content
placeholder = getattr(versioned_content, "content", None)
if not placeholder:
placeholder = versioned_content.placeholders.filter(
slot="content"
).first()
if not placeholder:
placeholder = Placeholder.objects.create(slot="content")
versioned_content.placeholders.add(placeholder)
if hasattr(versioned_content, "content"):
versioned_content.content = placeholder
versioned_content.save()
# Log to verify which placeholder is used
if self.verbosity >= 2:
if draft_version:
self.stdout.write(
f"Placeholder used: ID {placeholder.pk} (draft version)"
)
else:
self.stdout.write(
f"Placeholder used: ID {placeholder.pk} (published version or no versioning)"
)
# Delete existing plugins if necessary
# (Note: If draft_version was just created, plugins have already been deleted above)
if clear_existing and not dry_run:
if draft_version:
# Check if there are remaining plugins to delete
plugin_count_before = CMSPlugin.objects.filter(
placeholder=placeholder, language="fr"
).count()
if plugin_count_before > 0:
CMSPlugin.objects.filter(
placeholder=placeholder, language="fr"
).delete()
if self.verbosity >= 2:
self.stdout.write(
f"Existing plugins deleted from draft version placeholder: {plugin_count_before}"
)
else:
# No versioning, delete normally
CMSPlugin.objects.filter(
placeholder=placeholder, language="fr"
).delete()
# Import plugins
if not dry_run:
plugins_data = post_data.get("plugins", [])
imported_plugins = self.import_plugins(
placeholder, plugins_data, language="fr", post_slug=slug
)
# Verify that plugins are in the placeholder
if self.verbosity >= 2:
plugin_count = CMSPlugin.objects.filter(
placeholder=placeholder, language="fr"
).count()
self.stdout.write(
f"Plugins created in placeholder {placeholder.pk}: {plugin_count}"
)
if draft_version:
self.stdout.write(
f" (draft version #{draft_version.number})"
)
self.stdout.write(
self.style.SUCCESS(
f"Imported {imported_plugins} plugins for '{slug}'"
)
)
# Automatically publish the draft version
if VERSIONING_AVAILABLE and draft_version and not dry_run:
try:
# Django
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.first()
if user:
draft_version.publish(user)
if self.verbosity >= 1:
self.stdout.write(
self.style.SUCCESS(
f"Version published for '{slug}'"
)
)
except Exception as e:
self.stdout.write(
self.style.WARNING(
f"Unable to publish version for '{slug}': {e}"
)
)
else:
self.stdout.write(
f"[DRY-RUN] Would import {len(post_data.get('plugins', []))} plugins for '{slug}'"
)
imported_count += 1
except Exception as e:
error_count += 1
self.stdout.write(
self.style.ERROR(f"Error importing '{slug}': {e}")
)
if self.verbosity >= 2:
# Standard Library
import traceback
self.stdout.write(traceback.format_exc())
self.stdout.write(
self.style.SUCCESS(
f"\nImport completed: {imported_count} posts imported, "
f"{not_found_count} not found, {error_count} errors"
)
)
def import_plugins(
self, placeholder, plugins_data, language="fr", parent=None, post_slug=None
):
"""
Recursively import plugins into a placeholder.
For TextPlugin, replace IDs in the body with the new IDs of created plugins.
"""
imported_count = 0
# Mapping of old IDs to new IDs to replace in the body
id_mapping = {}
for _idx, plugin_data in enumerate(plugins_data):
plugin_type = plugin_data.get("plugin_type")
old_id = plugin_data.get("id") or plugin_data.get("attributes", {}).get(
"id"
)
# Unsupported plugins - ignore but process children
unsupported_plugins = [
"Bootstrap4GridContainerPlugin",
"Bootstrap4GridRowPlugin",
"Bootstrap4GridColumnPlugin",
]
if plugin_type in unsupported_plugins:
if "children" in plugin_data and plugin_data["children"]:
child_count = self.import_plugins(
placeholder,
plugin_data["children"],
language,
parent=parent,
post_slug=post_slug,
)
imported_count += child_count
continue
# Plugin type mapping
plugin_type_mapping = {
"FilerImagePlugin": "ImagePlugin",
"Bootstrap4PicturePlugin": "ImagePlugin",
"Bootstrap4LinkPlugin": "LinkPlugin",
"Bootstrap4Link": "LinkPlugin",
"Bootstrap4Picture": "ImagePlugin",
"PicturePlugin": "ImagePlugin", # Also convert PicturePlugin to ImagePlugin
}
if plugin_type in plugin_type_mapping:
plugin_type = plugin_type_mapping[plugin_type]
# Check that the plugin is available
try:
if not plugin_pool.get_plugin(plugin_type):
continue
except KeyError:
continue
# Create the plugin
try:
plugin = add_plugin(
placeholder=placeholder,
plugin_type=plugin_type,
language=language,
position="last-child",
target=parent,
)
except Exception as e:
if self.verbosity >= 2:
self.stdout.write(
self.style.WARNING(
f"Error creating plugin {plugin_type}: {e}"
)
)
continue
# Get the instance
instance, plugin_class = plugin.get_plugin_instance()
if not instance:
continue
# Store ID mapping if we have an old ID
if old_id:
id_mapping[str(old_id)] = str(plugin.pk)
# Fill data according to type
if plugin_type == "TextPlugin":
if self.verbosity >= 1:
self.stdout.write(
f" - importing text plugin (ID {plugin.pk})...", ending=""
)
body = plugin_data.get("body", "")
# Create children first to get their new IDs
if "children" in plugin_data and plugin_data["children"]:
if self.verbosity >= 1:
self.stdout.write(" ok!")
self.stdout.write(
f" - importing sub-plugins ({len(plugin_data['children'])} children)..."
)
child_mapping = {}
# Extract IDs in body order
# Standard Library
import re
ids_in_body = re.findall(r'<cms-plugin[^>]*id="(\d+)"[^>]*>', body)
if self.verbosity >= 2:
self.stdout.write(
f" IDs found in body: {ids_in_body}"
)
# Create a dictionary to quickly find children by ID
children_by_id = {}
for child_data in plugin_data["children"]:
# Get ID from various possible locations
child_old_id = (
child_data.get("id")
or child_data.get("attributes", {}).get("id")
or child_data.get("attributes", {}).get("cmsplugin_ptr_id")
or child_data.get("pk")
)
if child_old_id:
children_by_id[str(child_old_id)] = child_data
# Create children in the order of IDs in the body
for old_id_str in ids_in_body:
if old_id_str in children_by_id:
child_data = children_by_id[old_id_str]
child_plugin_type = child_data.get("plugin_type", "Unknown")
# Create the child
child_plugin = self._create_child_plugin(
placeholder,
child_data,
language,
parent=plugin,
post_slug=post_slug,
)
if child_plugin:
child_mapping[old_id_str] = str(child_plugin.pk)
if self.verbosity >= 1:
if child_plugin_type in [
"LinkPlugin",
"Bootstrap4LinkPlugin",
]:
name = child_data.get("name") or child_data.get(
"attributes", {}
).get("name", "")
url = child_data.get("url") or child_data.get(
"attributes", {}
).get("external_link", "")
self.stdout.write(
f' - importing link id {old_id_str} -> {child_plugin.pk} ("{name}" {url})... ok'
)
elif child_plugin_type in [
"ImagePlugin",
"Bootstrap4PicturePlugin",
"PicturePlugin",
"FilerImagePlugin",
]:
image_info = child_data.get("image", {})
filename = image_info.get(
"original_filename", "?"
)
self.stdout.write(
f" - importing image id {old_id_str} -> {child_plugin.pk} ({filename})..."
)
if self.verbosity >= 2:
self.stdout.write(
f" Mapping ID {old_id_str} -> {child_plugin.pk} ({child_plugin_type})"
)
# Also create children that are not in the body (just in case)
for child_data in plugin_data["children"]:
child_old_id = (
child_data.get("id")
or child_data.get("attributes", {}).get("id")
or child_data.get("attributes", {}).get("cmsplugin_ptr_id")
or child_data.get("pk")
)
if child_old_id and str(child_old_id) not in child_mapping:
# Create the child
child_plugin = self._create_child_plugin(
placeholder,
child_data,
language,
parent=plugin,
post_slug=post_slug,
)
if child_plugin:
child_mapping[str(child_old_id)] = str(child_plugin.pk)
if self.verbosity >= 2:
self.stdout.write(
f" Mapping ID {child_old_id} -> {child_plugin.pk} ({child_data.get('plugin_type')}) (not in body)"
)
# Replace IDs in the body
if child_mapping:
body = self._replace_cms_plugin_ids(body, child_mapping)
else:
if self.verbosity >= 1:
self.stdout.write(" ok!")
instance.body = body
instance.save()
imported_count += 1
elif plugin_type == "LinkPlugin":
if self.verbosity >= 1:
self.stdout.write(
f" - importing link (ID {plugin.pk})...", ending=""
)
# Get the name
name = plugin_data.get("name") or plugin_data.get("attributes", {}).get(
"name"
)
if name:
instance.name = name
# Use the exported URL as external link (simplification)
# The URL can be either external, or the full URL of an internal link from the old site
# PRIORITY: always use "url" if present (even if empty), do not use fallback
link_url = plugin_data.get("url")
# If "url" is not present at root level, look in attributes.external_link
if link_url is None:
link_url = plugin_data.get("attributes", {}).get("external_link")
# Use the URL as external link (even if it's a relative URL like "/contact/")
if link_url:
instance.link = link_url
# Do NOT use fallback with internal_link_id because IDs don't match between sites
if "target" in plugin_data:
instance.target = plugin_data["target"]
elif (
"attributes" in plugin_data
and "target" in plugin_data["attributes"]
):
instance.target = plugin_data["attributes"]["target"]
instance.save()
if self.verbosity >= 1:
self.stdout.write(f' ok! ("{name}" -> {link_url})')
imported_count += 1
elif plugin_type == "ImagePlugin":
# Image plugin from djangocms-frontend
image_info = plugin_data.get("image", {})
filename = image_info.get("original_filename", "?")
if self.verbosity >= 1:
self.stdout.write(
f" - importing image (ID {plugin.pk}, {filename})...",
ending="",
)
image = self._find_or_download_image(plugin_data, post_slug)
if image:
if self.verbosity >= 1:
self.stdout.write(
f" ok! (image ID {image.pk}: {image.original_filename})"
)
# The Image model uses a config field (JSONField)
# Properties are accessible via __getattr__ which looks in config
# Initialize config if necessary
if not instance.config:
instance.config = {}
# Set the image (use a dictionary with pk and model for compatibility)
instance.config["picture"] = {
"pk": image.pk,
"model": "filer.image",
}
# external_picture must be defined (even if empty) to avoid AttributeError
instance.config["external_picture"] = ""
# Handle size and scaling attributes
# width and height must always be defined (even None) to avoid AttributeError
if "attributes" in plugin_data:
attrs = plugin_data["attributes"]
if "width" in attrs and attrs["width"]:
instance.config["width"] = attrs["width"]
else:
instance.config["width"] = None
if "height" in attrs and attrs["height"]:
instance.config["height"] = attrs["height"]
else:
instance.config["height"] = None
else:
instance.config["width"] = None
instance.config["height"] = None
# Enable automatic scaling if no size defined
if not instance.config.get("width") and not instance.config.get(
"height"
):
instance.config["use_automatic_scaling"] = True
else:
instance.config["use_automatic_scaling"] = False
# Enable picture_fluid to make the image responsive
instance.config["picture_fluid"] = True
# Set picture_rounded and picture_thumbnail (even if False by default)
if "picture_rounded" not in instance.config:
instance.config["picture_rounded"] = False
if "picture_thumbnail" not in instance.config:
instance.config["picture_thumbnail"] = False
# Set other default fields if necessary
if "use_responsive_image" not in instance.config:
instance.config["use_responsive_image"] = "inherit"
if "lazy_loading" not in instance.config:
instance.config["lazy_loading"] = False
if "use_crop" not in instance.config:
instance.config["use_crop"] = False
if "use_upscale" not in instance.config:
instance.config["use_upscale"] = False
if "use_no_cropping" not in instance.config:
instance.config["use_no_cropping"] = False
if "alignment" not in instance.config:
instance.config["alignment"] = ""
if "template" not in instance.config:
instance.config["template"] = "default"
# Additional fields for compatibility
if "attributes" not in instance.config:
instance.config["attributes"] = {}
if "link" not in instance.config:
instance.config["link"] = {}
if "target" not in instance.config:
instance.config["target"] = ""
if "margin_x" not in instance.config:
instance.config["margin_x"] = ""
if "margin_y" not in instance.config:
instance.config["margin_y"] = ""
if "margin_devices" not in instance.config:
instance.config["margin_devices"] = None
if "responsive_visibility" not in instance.config:
instance.config["responsive_visibility"] = None
if "thumbnail_options" not in instance.config:
instance.config["thumbnail_options"] = None
instance.save()
imported_count += 1
else:
if self.verbosity >= 1:
self.stdout.write(" ✗ Image not found!")
if self.verbosity >= 2:
self.stdout.write(
self.style.WARNING(
f"Image not found for plugin {plugin.pk}"
)
)
# Process children recursively (except for TextPlugin which handles them already)
if (
plugin_type != "TextPlugin"
and "children" in plugin_data
and plugin_data["children"]
):
child_count = self.import_plugins(
placeholder,
plugin_data["children"],
language,
parent=plugin,
post_slug=post_slug,
)
imported_count += child_count
return imported_count
def _create_child_plugin(
self, placeholder, plugin_data, language, parent=None, post_slug=None
):
"""Create a child plugin and return the instance"""
plugin_type = plugin_data.get("plugin_type")
# Type mapping
plugin_type_mapping = {
"FilerImagePlugin": "ImagePlugin",
"Bootstrap4PicturePlugin": "ImagePlugin",
"Bootstrap4LinkPlugin": "LinkPlugin",
"Bootstrap4Link": "LinkPlugin",
"Bootstrap4Picture": "ImagePlugin",
"PicturePlugin": "ImagePlugin",
}
if plugin_type in plugin_type_mapping:
plugin_type = plugin_type_mapping[plugin_type]
try:
if not plugin_pool.get_plugin(plugin_type):
return None
except KeyError:
return None
try:
plugin = add_plugin(
placeholder=placeholder,
plugin_type=plugin_type,
language=language,
position="last-child",
target=parent,
)
except Exception:
return None
instance, _ = plugin.get_plugin_instance()
if not instance:
return None
# Fill data
if plugin_type == "LinkPlugin":
name = plugin_data.get("name") or plugin_data.get("attributes", {}).get(
"name"
)
if name:
instance.name = name
# PRIORITY: always use "url" if present, do not use fallback with internal_link_id
link_url = plugin_data.get("url")
if link_url is None:
link_url = plugin_data.get("attributes", {}).get("external_link")
if link_url:
instance.link = link_url
# Do NOT use fallback with internal_link_id because IDs don't match between sites
instance.save()
if self.verbosity >= 1:
old_id = (
plugin_data.get("id")
or plugin_data.get("attributes", {}).get("id")
or plugin_data.get("attributes", {}).get("cmsplugin_ptr_id")
or plugin_data.get("pk")
)
self.stdout.write(
f' - importing link id {old_id} -> {plugin.pk} ("{name}" {link_url})... ok'
)
return plugin
elif plugin_type == "ImagePlugin":
# Image plugin from djangocms-frontend
image_info = plugin_data.get("image", {})
filename = image_info.get("original_filename", "?")
old_id = (
plugin_data.get("id")
or plugin_data.get("attributes", {}).get("id")
or plugin_data.get("attributes", {}).get("cmsplugin_ptr_id")
or plugin_data.get("pk")
)
if self.verbosity >= 1:
self.stdout.write(
f" - importing image id {old_id} ({filename})...",
ending="",
)
image = self._find_or_download_image(plugin_data, post_slug)
if image:
if self.verbosity >= 1:
self.stdout.write(
f" ok! (image ID {image.pk}: {image.original_filename})"
)
if not instance.config:
instance.config = {}
instance.config["picture"] = {"pk": image.pk, "model": "filer.image"}
instance.config["external_picture"] = ""
instance.config["width"] = None
instance.config["height"] = None
instance.config["use_automatic_scaling"] = True
instance.config["picture_fluid"] = True
instance.config["picture_rounded"] = False
instance.config["picture_thumbnail"] = False
instance.config["use_responsive_image"] = "inherit"
instance.config["lazy_loading"] = False
instance.config["use_crop"] = False
instance.config["use_upscale"] = False
instance.config["use_no_cropping"] = False
instance.config["alignment"] = ""
instance.config["template"] = "default"
instance.config["attributes"] = {}
instance.config["link"] = {}
instance.config["target"] = ""
instance.config["margin_x"] = ""
instance.config["margin_y"] = ""
instance.config["margin_devices"] = None
instance.config["responsive_visibility"] = None
instance.config["thumbnail_options"] = None
instance.save()
return plugin
else:
if self.verbosity >= 1:
self.stdout.write(" ✗ Image not found!")
return None
def _replace_cms_plugin_ids(self, body, id_mapping):
"""Replace IDs in <cms-plugin> tags with new IDs"""
if not id_mapping:
return body
def replace_id(match):
old_id = match.group(1)
new_id = id_mapping.get(old_id)
if new_id:
# Replace the ID in the tag
return match.group(0).replace(f'id="{old_id}"', f'id="{new_id}"')
else:
# If ID is not in mapping, display a warning
if self.verbosity >= 2:
self.stdout.write(
self.style.WARNING(
f"ID {old_id} not found in mapping. Available mapping: {list(id_mapping.keys())}"
)
)
return match.group(0)
# Pattern to find <cms-plugin ... id="XXX" ...>
pattern = r'<cms-plugin[^>]*id="(\d+)"[^>]*>'
return re.sub(pattern, replace_id, body)
def _find_or_download_image(self, plugin_data, post_slug):
"""Find or download an image"""
if not FILER_AVAILABLE or not self.blog_folder:
return None
image = None
# Method 1: Directly exported data
# IMPORTANT: Do not use filer_id first because IDs don't match between sites
# Prioritize file_path and original_filename which are more reliable
if "image" in plugin_data:
image_info = plugin_data["image"]
file_path = image_info.get("file_path")
original_filename = image_info.get("original_filename")
filer_id = image_info.get("filer_id")
if self.verbosity >= 2:
self.stdout.write(" find_or_download_image:")
self.stdout.write(f" file_path: {file_path}")
self.stdout.write(f" original_filename: {original_filename}")
self.stdout.write(f" filer_id: {filer_id}")
# Priority 1: Search by file_path (most reliable)
if file_path:
# Try to find by full path or filename
filename_from_path = file_path.split("/")[-1]
if self.verbosity >= 2:
self.stdout.write(
f" Search by filename_from_path: {filename_from_path}"
)
# Search by filename in the path
image = FilerImage.objects.filter(
file__icontains=filename_from_path
).first()
if image:
# Verify that the file really exists
file_exists = False
check_method = None
if hasattr(image.file, "storage") and hasattr(
image.file.storage, "exists"
):
file_exists = image.file.storage.exists(image.file.name)
check_method = "storage.exists"
elif hasattr(image.file, "path"):
# Standard Library
import os
file_exists = os.path.exists(image.file.path)
check_method = "os.path.exists"
if self.verbosity >= 2:
self.stdout.write(
f" File existence check (method: {check_method}): {file_exists}"
)
if file_exists:
if self.verbosity >= 2:
self.stdout.write(
f" ✓ Found by filename! (ID {image.pk}: {image.file.name})"
)
return image
else:
if self.verbosity >= 1:
self.stdout.write(
self.style.WARNING(
f" ✗ Image found (ID {image.pk}) but file does not exist: {image.file.name}"
)
)
# Don't return this image, continue to download
image = None
# Also search by partial path (without filer_public prefix)
if not image:
path_parts = file_path.split("/")
if len(path_parts) >= 3:
# Take the last 2 segments (e.g.: ed/63/ed636ad9...)
search_path = "/".join(path_parts[-2:])
if self.verbosity >= 2:
self.stdout.write(
f" Search by partial path: {search_path}"
)
candidate = FilerImage.objects.filter(
file__icontains=search_path
).first()
if candidate:
# Verify that the file really exists
file_exists = False
if hasattr(candidate.file, "storage") and hasattr(
candidate.file.storage, "exists"
):
file_exists = candidate.file.storage.exists(
candidate.file.name
)
elif hasattr(candidate.file, "path"):
# Standard Library
import os
file_exists = os.path.exists(candidate.file.path)
if file_exists:
image = candidate
if self.verbosity >= 2:
self.stdout.write(
f" ✓ Found by partial path! (ID {image.pk}: {image.file.name})"
)
return image
else:
if self.verbosity >= 2:
self.stdout.write(
f" ✗ Image found (ID {candidate.pk}) but file does not exist: {candidate.file.name}"
)
# Priority 2: Search by original_filename
if not image and original_filename:
if self.verbosity >= 2:
self.stdout.write(
f" Search by original_filename: {original_filename}"
)
candidate = FilerImage.objects.filter(
original_filename=original_filename
).first()
if candidate:
# Verify that the file really exists
file_exists = False
if hasattr(candidate.file, "storage") and hasattr(
candidate.file.storage, "exists"
):
file_exists = candidate.file.storage.exists(candidate.file.name)
elif hasattr(candidate.file, "path"):
# Standard Library
import os
file_exists = os.path.exists(candidate.file.path)
if file_exists:
image = candidate
if self.verbosity >= 2:
self.stdout.write(
f" ✓ Found by original_filename! (ID {image.pk}: {image.file.name})"
)
return image
else:
if self.verbosity >= 2:
self.stdout.write(
f" ✗ Image found (ID {candidate.pk}) but file does not exist: {candidate.file.name}"
)
# Priority 3: Use filer_id only as last resort
# (but verify that the found image matches file_path or original_filename)
if not image and filer_id:
if self.verbosity >= 2:
self.stdout.write(f" Search by filer_id: {filer_id}")
try:
candidate_image = FilerImage.objects.get(pk=filer_id)
# Verify that it's the right image
matches = False
if file_path:
filename_from_path = file_path.split("/")[-1]
if (
filename_from_path.lower()
in candidate_image.file.name.lower()
):
matches = True
if self.verbosity >= 2:
self.stdout.write(
f" ✓ Found by filer_id and verified by file_path! (ID {candidate_image.pk}: {candidate_image.file.name})"
)
if not matches and original_filename:
if candidate_image.original_filename == original_filename:
matches = True
if self.verbosity >= 2:
self.stdout.write(
f" ✓ Found by filer_id and verified by original_filename! (ID {candidate_image.pk}: {candidate_image.original_filename})"
)
if matches:
return candidate_image
# If no match, don't use this image and continue to download
if self.verbosity >= 2:
self.stdout.write(
f" ✗ Image ID {filer_id} found but does not match (file: {candidate_image.file.name}, original: {candidate_image.original_filename})"
)
if self.verbosity >= 2:
self.stdout.write(
" → Continuing to download the correct image"
)
# Don't return the image, continue to download
except FilerImage.DoesNotExist:
if self.verbosity >= 2:
self.stdout.write(
f" ✗ Image ID {filer_id} not found"
)
pass
# Method 2: For converted Bootstrap4PicturePlugin
# DO NOT use this method because picture_id corresponds to filer_id which doesn't match between sites
# This method is disabled to avoid using the wrong image
# if not image and "attributes" in plugin_data:
# attrs = plugin_data["attributes"]
# picture_id = attrs.get("picture_id")
# if picture_id:
# try:
# image = FilerImage.objects.get(pk=picture_id)
# return image
# except FilerImage.DoesNotExist:
# pass
# Method 3: Download from media server if we have a file_path
if not image and "image" in plugin_data:
image_info = plugin_data["image"]
file_path = image_info.get("file_path")
original_filename = image_info.get("original_filename")
if file_path:
media_url = f"{self.site_url}/media/{file_path.lstrip('/')}"
if self.verbosity >= 2:
self.stdout.write(f" Downloading from: {media_url}")
image = self._download_image_from_url(
media_url, original_filename or file_path.split("/")[-1]
)
if image:
if self.verbosity >= 1:
self.stdout.write(
f" ✓ Image downloaded! (ID {image.pk}: {image.original_filename})"
)
if self.verbosity >= 2:
# Verify that the file exists after download
file_exists = False
if hasattr(image.file, "storage") and hasattr(
image.file.storage, "exists"
):
file_exists = image.file.storage.exists(image.file.name)
elif hasattr(image.file, "path"):
# Standard Library
import os
file_exists = os.path.exists(image.file.path)
if file_exists:
self.stdout.write(
f" ✓ File verified after download: {image.file.name}"
)
else:
self.stdout.write(
self.style.ERROR(
f" ✗ WARNING: File downloaded but not found: {image.file.name}"
)
)
else:
if self.verbosity >= 1:
self.stdout.write(
self.style.ERROR(" ✗ Download failed")
)
else:
if self.verbosity >= 2:
self.stdout.write(
" ✗ No file_path, cannot download"
)
if not image and self.verbosity >= 2:
self.stdout.write(
" ✗ Image not found after all attempts"
)
return image
def _download_image_from_url(self, url, filename):
"""Download an image from a URL"""
if not FILER_AVAILABLE:
if self.verbosity >= 2:
self.stdout.write(
self.style.WARNING(
"FILER_AVAILABLE is False, cannot download"
)
)
return None
if not self.blog_folder:
if self.verbosity >= 2:
self.stdout.write(
self.style.WARNING(
"blog_folder is not defined, cannot download"
)
)
return None
try:
if self.verbosity >= 2:
self.stdout.write(
f" Starting download from {url}..."
)
response = requests.get(url, timeout=30)
response.raise_for_status()
if self.verbosity >= 2:
self.stdout.write(
f" Response received: {len(response.content)} bytes"
)
# Verify that it's really an image
content_type = response.headers.get("content-type", "")
if self.verbosity >= 2:
self.stdout.write(f" Content-Type: {content_type}")
if not content_type.startswith("image/"):
if self.verbosity >= 2:
self.stdout.write(
self.style.WARNING(
f"Content is not an image (content-type: {content_type})"
)
)
return None
# Create the file in filer
if self.verbosity >= 2:
self.stdout.write(
f" Creating ContentFile with name: {filename}"
)
image_file = ContentFile(response.content, name=filename)
if self.verbosity >= 2:
self.stdout.write(
f" Creating FilerImage in folder: {self.blog_folder.name} (ID: {self.blog_folder.pk})"
)
image = FilerImage.objects.create(
folder=self.blog_folder,
file=image_file,
original_filename=filename,
)
if self.verbosity >= 2:
self.stdout.write(
f" FilerImage created successfully (ID: {image.pk}, file.name: {image.file.name})"
)
# Verify that the file really exists
if hasattr(image.file, "storage") and hasattr(image.file.storage, "exists"):
if not image.file.storage.exists(image.file.name):
if self.verbosity >= 2:
self.stdout.write(
self.style.ERROR(
f"WARNING: File {image.file.name} does not exist in storage!"
)
)
else:
if self.verbosity >= 2:
self.stdout.write(
f" ✓ File verified in storage: {image.file.name}"
)
return image
except requests.exceptions.RequestException as e:
if self.verbosity >= 1:
self.stdout.write(
self.style.ERROR(
f" ✗ HTTP error downloading {url}: {e}"
)
)
return None
except Exception as e:
if self.verbosity >= 1:
self.stdout.write(
self.style.ERROR(
f" ✗ Error downloading {url}: {type(e).__name__}: {e}"
)
)
# Standard Library
import traceback
if self.verbosity >= 2:
self.stdout.write(traceback.format_exc())
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment