|
#!/usr/bin/env python |
|
""" |
|
Script to import content from the "content" placeholder into blog posts |
|
for Django CMS 5 (djangocms-stories) |
|
|
|
Usage: |
|
python manage.py import_blog_content input.json |
|
""" |
|
|
|
# Standard Library |
|
import json |
|
import re |
|
|
|
# Django |
|
from django.core.files.base import ContentFile |
|
from django.core.management.base import BaseCommand |
|
|
|
# Third party |
|
import requests |
|
|
|
try: |
|
# Third party |
|
from djangocms_stories.models import PostContent |
|
|
|
DJANGOCMS_STORIES_AVAILABLE = True |
|
except ImportError: |
|
DJANGOCMS_STORIES_AVAILABLE = False |
|
|
|
# Django |
|
from django.contrib.contenttypes.models import ContentType |
|
|
|
# Third party |
|
from cms.api import add_plugin |
|
from cms.models import CMSPlugin, Placeholder |
|
|
|
try: |
|
# Third party |
|
from djangocms_versioning.constants import DRAFT, PUBLISHED |
|
from djangocms_versioning.models import Version |
|
|
|
VERSIONING_AVAILABLE = True |
|
except ImportError: |
|
VERSIONING_AVAILABLE = False |
|
DRAFT = None |
|
PUBLISHED = None |
|
# Third party |
|
from cms.plugin_pool import plugin_pool |
|
|
|
try: |
|
# Third party |
|
from filer.models import Folder |
|
from filer.models import Image as FilerImage |
|
|
|
FILER_AVAILABLE = True |
|
except ImportError: |
|
FILER_AVAILABLE = False |
|
|
|
|
|
class Command(BaseCommand): |
|
help = "Import content from the 'content' placeholder into blog posts" |
|
|
|
def add_arguments(self, parser): |
|
parser.add_argument( |
|
"input_file", type=str, help="Path to the input JSON file" |
|
) |
|
parser.add_argument( |
|
"--dry-run", |
|
action="store_true", |
|
help="Dry run mode (does not make any modifications)", |
|
) |
|
parser.add_argument( |
|
"--clear-existing", |
|
action="store_true", |
|
help="Delete existing content before import", |
|
) |
|
parser.add_argument( |
|
"--site-url", |
|
type=str, |
|
default="https://www.example.com", |
|
help="Public site URL (old site) to download images from", |
|
) |
|
|
|
def handle(self, *args, **options): |
|
if not DJANGOCMS_STORIES_AVAILABLE: |
|
self.stdout.write( |
|
self.style.ERROR("djangocms-stories is not available") |
|
) |
|
return |
|
|
|
input_file = options["input_file"] |
|
dry_run = options["dry_run"] |
|
clear_existing = options["clear_existing"] |
|
self.verbosity = options.get("verbosity", 1) |
|
self.site_url = options.get("site_url", "https://www.example.com") |
|
|
|
# Create or retrieve the "blog" folder in filer |
|
self.blog_folder = None |
|
if FILER_AVAILABLE and not dry_run: |
|
self.blog_folder, created = Folder.objects.get_or_create(name="blog") |
|
if created: |
|
self.stdout.write(self.style.SUCCESS("Folder 'blog' created in filer")) |
|
|
|
# Load JSON |
|
with open(input_file, encoding="utf-8") as f: |
|
blog_posts = json.load(f) |
|
|
|
imported_count = 0 |
|
not_found_count = 0 |
|
error_count = 0 |
|
|
|
for post_data in blog_posts: |
|
slug = post_data.get("slug") |
|
if not slug: |
|
continue |
|
|
|
try: |
|
# Find post by slug |
|
post_content = PostContent.objects.filter( |
|
slug=slug, language="fr" |
|
).first() |
|
|
|
if not post_content: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
self.style.WARNING(f"Post with slug '{slug}' not found") |
|
) |
|
not_found_count += 1 |
|
continue |
|
|
|
# Check that use_placeholder is enabled |
|
if not post_content.post.app_config.use_placeholder: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
self.style.WARNING( |
|
f"use_placeholder is not enabled for '{slug}', ignored" |
|
) |
|
) |
|
not_found_count += 1 |
|
continue |
|
|
|
# Handle djangocms-versioning BEFORE getting the placeholder |
|
# Because each version has its own placeholder |
|
draft_version = None |
|
versioned_content = ( |
|
post_content # By default, use the original content |
|
) |
|
|
|
if VERSIONING_AVAILABLE: |
|
content_type = ContentType.objects.get_for_model(PostContent) |
|
published_version = Version.objects.filter( |
|
content_type=content_type, |
|
object_id=post_content.pk, |
|
state=PUBLISHED, |
|
).first() |
|
|
|
draft_version = Version.objects.filter( |
|
content_type=content_type, |
|
object_id=post_content.pk, |
|
state=DRAFT, |
|
).first() |
|
|
|
if not draft_version and published_version: |
|
# Create a draft version by copying the published version |
|
# Django |
|
from django.contrib.auth import get_user_model |
|
|
|
User = get_user_model() |
|
user = User.objects.first() |
|
|
|
if user: |
|
try: |
|
draft_version = published_version.copy(created_by=user) |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
self.style.SUCCESS( |
|
f"Draft version created for '{slug}' (number={draft_version.number})" |
|
) |
|
) |
|
|
|
# IMPORTANT: After copying, get the content from the draft version |
|
# to ensure we're working with the correct placeholder |
|
versioned_content = draft_version.content |
|
|
|
# Delete copied plugins from the draft version if clear_existing |
|
if clear_existing and not dry_run: |
|
draft_placeholder = getattr( |
|
versioned_content, "content", None |
|
) |
|
if not draft_placeholder: |
|
draft_placeholder = ( |
|
versioned_content.placeholders.filter( |
|
slot="content" |
|
).first() |
|
) |
|
if draft_placeholder: |
|
deleted_count = CMSPlugin.objects.filter( |
|
placeholder=draft_placeholder, language="fr" |
|
).delete()[0] |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f"Copied plugins deleted from draft: {deleted_count}" |
|
) |
|
|
|
except Exception as e: |
|
self.stdout.write( |
|
self.style.WARNING( |
|
f"Unable to create draft version for '{slug}': {e}" |
|
) |
|
) |
|
|
|
# Use the draft version content if it exists |
|
if draft_version: |
|
# Get content from the draft version |
|
versioned_content = draft_version.content |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f"Using draft version content (ID: {versioned_content.pk})" |
|
) |
|
|
|
# Get the "content" placeholder from the versioned content |
|
placeholder = getattr(versioned_content, "content", None) |
|
if not placeholder: |
|
placeholder = versioned_content.placeholders.filter( |
|
slot="content" |
|
).first() |
|
if not placeholder: |
|
placeholder = Placeholder.objects.create(slot="content") |
|
versioned_content.placeholders.add(placeholder) |
|
if hasattr(versioned_content, "content"): |
|
versioned_content.content = placeholder |
|
versioned_content.save() |
|
|
|
# Log to verify which placeholder is used |
|
if self.verbosity >= 2: |
|
if draft_version: |
|
self.stdout.write( |
|
f"Placeholder used: ID {placeholder.pk} (draft version)" |
|
) |
|
else: |
|
self.stdout.write( |
|
f"Placeholder used: ID {placeholder.pk} (published version or no versioning)" |
|
) |
|
|
|
# Delete existing plugins if necessary |
|
# (Note: If draft_version was just created, plugins have already been deleted above) |
|
if clear_existing and not dry_run: |
|
if draft_version: |
|
# Check if there are remaining plugins to delete |
|
plugin_count_before = CMSPlugin.objects.filter( |
|
placeholder=placeholder, language="fr" |
|
).count() |
|
if plugin_count_before > 0: |
|
CMSPlugin.objects.filter( |
|
placeholder=placeholder, language="fr" |
|
).delete() |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f"Existing plugins deleted from draft version placeholder: {plugin_count_before}" |
|
) |
|
else: |
|
# No versioning, delete normally |
|
CMSPlugin.objects.filter( |
|
placeholder=placeholder, language="fr" |
|
).delete() |
|
|
|
# Import plugins |
|
if not dry_run: |
|
plugins_data = post_data.get("plugins", []) |
|
imported_plugins = self.import_plugins( |
|
placeholder, plugins_data, language="fr", post_slug=slug |
|
) |
|
|
|
# Verify that plugins are in the placeholder |
|
if self.verbosity >= 2: |
|
plugin_count = CMSPlugin.objects.filter( |
|
placeholder=placeholder, language="fr" |
|
).count() |
|
self.stdout.write( |
|
f"Plugins created in placeholder {placeholder.pk}: {plugin_count}" |
|
) |
|
if draft_version: |
|
self.stdout.write( |
|
f" (draft version #{draft_version.number})" |
|
) |
|
|
|
self.stdout.write( |
|
self.style.SUCCESS( |
|
f"Imported {imported_plugins} plugins for '{slug}'" |
|
) |
|
) |
|
|
|
# Automatically publish the draft version |
|
if VERSIONING_AVAILABLE and draft_version and not dry_run: |
|
try: |
|
# Django |
|
from django.contrib.auth import get_user_model |
|
|
|
User = get_user_model() |
|
user = User.objects.first() |
|
|
|
if user: |
|
draft_version.publish(user) |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
self.style.SUCCESS( |
|
f"Version published for '{slug}'" |
|
) |
|
) |
|
except Exception as e: |
|
self.stdout.write( |
|
self.style.WARNING( |
|
f"Unable to publish version for '{slug}': {e}" |
|
) |
|
) |
|
else: |
|
self.stdout.write( |
|
f"[DRY-RUN] Would import {len(post_data.get('plugins', []))} plugins for '{slug}'" |
|
) |
|
|
|
imported_count += 1 |
|
|
|
except Exception as e: |
|
error_count += 1 |
|
self.stdout.write( |
|
self.style.ERROR(f"Error importing '{slug}': {e}") |
|
) |
|
if self.verbosity >= 2: |
|
# Standard Library |
|
import traceback |
|
|
|
self.stdout.write(traceback.format_exc()) |
|
|
|
self.stdout.write( |
|
self.style.SUCCESS( |
|
f"\nImport completed: {imported_count} posts imported, " |
|
f"{not_found_count} not found, {error_count} errors" |
|
) |
|
) |
|
|
|
def import_plugins( |
|
self, placeholder, plugins_data, language="fr", parent=None, post_slug=None |
|
): |
|
""" |
|
Recursively import plugins into a placeholder. |
|
For TextPlugin, replace IDs in the body with the new IDs of created plugins. |
|
""" |
|
imported_count = 0 |
|
# Mapping of old IDs to new IDs to replace in the body |
|
id_mapping = {} |
|
|
|
for _idx, plugin_data in enumerate(plugins_data): |
|
plugin_type = plugin_data.get("plugin_type") |
|
old_id = plugin_data.get("id") or plugin_data.get("attributes", {}).get( |
|
"id" |
|
) |
|
|
|
# Unsupported plugins - ignore but process children |
|
unsupported_plugins = [ |
|
"Bootstrap4GridContainerPlugin", |
|
"Bootstrap4GridRowPlugin", |
|
"Bootstrap4GridColumnPlugin", |
|
] |
|
|
|
if plugin_type in unsupported_plugins: |
|
if "children" in plugin_data and plugin_data["children"]: |
|
child_count = self.import_plugins( |
|
placeholder, |
|
plugin_data["children"], |
|
language, |
|
parent=parent, |
|
post_slug=post_slug, |
|
) |
|
imported_count += child_count |
|
continue |
|
|
|
# Plugin type mapping |
|
plugin_type_mapping = { |
|
"FilerImagePlugin": "ImagePlugin", |
|
"Bootstrap4PicturePlugin": "ImagePlugin", |
|
"Bootstrap4LinkPlugin": "LinkPlugin", |
|
"Bootstrap4Link": "LinkPlugin", |
|
"Bootstrap4Picture": "ImagePlugin", |
|
"PicturePlugin": "ImagePlugin", # Also convert PicturePlugin to ImagePlugin |
|
} |
|
|
|
if plugin_type in plugin_type_mapping: |
|
plugin_type = plugin_type_mapping[plugin_type] |
|
|
|
# Check that the plugin is available |
|
try: |
|
if not plugin_pool.get_plugin(plugin_type): |
|
continue |
|
except KeyError: |
|
continue |
|
|
|
# Create the plugin |
|
try: |
|
plugin = add_plugin( |
|
placeholder=placeholder, |
|
plugin_type=plugin_type, |
|
language=language, |
|
position="last-child", |
|
target=parent, |
|
) |
|
except Exception as e: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
self.style.WARNING( |
|
f"Error creating plugin {plugin_type}: {e}" |
|
) |
|
) |
|
continue |
|
|
|
# Get the instance |
|
instance, plugin_class = plugin.get_plugin_instance() |
|
if not instance: |
|
continue |
|
|
|
# Store ID mapping if we have an old ID |
|
if old_id: |
|
id_mapping[str(old_id)] = str(plugin.pk) |
|
|
|
# Fill data according to type |
|
if plugin_type == "TextPlugin": |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
f" - importing text plugin (ID {plugin.pk})...", ending="" |
|
) |
|
body = plugin_data.get("body", "") |
|
|
|
# Create children first to get their new IDs |
|
if "children" in plugin_data and plugin_data["children"]: |
|
if self.verbosity >= 1: |
|
self.stdout.write(" ok!") |
|
self.stdout.write( |
|
f" - importing sub-plugins ({len(plugin_data['children'])} children)..." |
|
) |
|
child_mapping = {} |
|
|
|
# Extract IDs in body order |
|
# Standard Library |
|
import re |
|
|
|
ids_in_body = re.findall(r'<cms-plugin[^>]*id="(\d+)"[^>]*>', body) |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" IDs found in body: {ids_in_body}" |
|
) |
|
|
|
# Create a dictionary to quickly find children by ID |
|
children_by_id = {} |
|
for child_data in plugin_data["children"]: |
|
# Get ID from various possible locations |
|
child_old_id = ( |
|
child_data.get("id") |
|
or child_data.get("attributes", {}).get("id") |
|
or child_data.get("attributes", {}).get("cmsplugin_ptr_id") |
|
or child_data.get("pk") |
|
) |
|
if child_old_id: |
|
children_by_id[str(child_old_id)] = child_data |
|
|
|
# Create children in the order of IDs in the body |
|
for old_id_str in ids_in_body: |
|
if old_id_str in children_by_id: |
|
child_data = children_by_id[old_id_str] |
|
child_plugin_type = child_data.get("plugin_type", "Unknown") |
|
# Create the child |
|
child_plugin = self._create_child_plugin( |
|
placeholder, |
|
child_data, |
|
language, |
|
parent=plugin, |
|
post_slug=post_slug, |
|
) |
|
if child_plugin: |
|
child_mapping[old_id_str] = str(child_plugin.pk) |
|
if self.verbosity >= 1: |
|
if child_plugin_type in [ |
|
"LinkPlugin", |
|
"Bootstrap4LinkPlugin", |
|
]: |
|
name = child_data.get("name") or child_data.get( |
|
"attributes", {} |
|
).get("name", "") |
|
url = child_data.get("url") or child_data.get( |
|
"attributes", {} |
|
).get("external_link", "") |
|
self.stdout.write( |
|
f' - importing link id {old_id_str} -> {child_plugin.pk} ("{name}" {url})... ok' |
|
) |
|
elif child_plugin_type in [ |
|
"ImagePlugin", |
|
"Bootstrap4PicturePlugin", |
|
"PicturePlugin", |
|
"FilerImagePlugin", |
|
]: |
|
image_info = child_data.get("image", {}) |
|
filename = image_info.get( |
|
"original_filename", "?" |
|
) |
|
self.stdout.write( |
|
f" - importing image id {old_id_str} -> {child_plugin.pk} ({filename})..." |
|
) |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" Mapping ID {old_id_str} -> {child_plugin.pk} ({child_plugin_type})" |
|
) |
|
|
|
# Also create children that are not in the body (just in case) |
|
for child_data in plugin_data["children"]: |
|
child_old_id = ( |
|
child_data.get("id") |
|
or child_data.get("attributes", {}).get("id") |
|
or child_data.get("attributes", {}).get("cmsplugin_ptr_id") |
|
or child_data.get("pk") |
|
) |
|
if child_old_id and str(child_old_id) not in child_mapping: |
|
# Create the child |
|
child_plugin = self._create_child_plugin( |
|
placeholder, |
|
child_data, |
|
language, |
|
parent=plugin, |
|
post_slug=post_slug, |
|
) |
|
if child_plugin: |
|
child_mapping[str(child_old_id)] = str(child_plugin.pk) |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" Mapping ID {child_old_id} -> {child_plugin.pk} ({child_data.get('plugin_type')}) (not in body)" |
|
) |
|
|
|
# Replace IDs in the body |
|
if child_mapping: |
|
body = self._replace_cms_plugin_ids(body, child_mapping) |
|
else: |
|
if self.verbosity >= 1: |
|
self.stdout.write(" ok!") |
|
|
|
instance.body = body |
|
instance.save() |
|
imported_count += 1 |
|
|
|
elif plugin_type == "LinkPlugin": |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
f" - importing link (ID {plugin.pk})...", ending="" |
|
) |
|
# Get the name |
|
name = plugin_data.get("name") or plugin_data.get("attributes", {}).get( |
|
"name" |
|
) |
|
if name: |
|
instance.name = name |
|
|
|
# Use the exported URL as external link (simplification) |
|
# The URL can be either external, or the full URL of an internal link from the old site |
|
# PRIORITY: always use "url" if present (even if empty), do not use fallback |
|
link_url = plugin_data.get("url") |
|
|
|
# If "url" is not present at root level, look in attributes.external_link |
|
if link_url is None: |
|
link_url = plugin_data.get("attributes", {}).get("external_link") |
|
|
|
# Use the URL as external link (even if it's a relative URL like "/contact/") |
|
if link_url: |
|
instance.link = link_url |
|
# Do NOT use fallback with internal_link_id because IDs don't match between sites |
|
|
|
if "target" in plugin_data: |
|
instance.target = plugin_data["target"] |
|
elif ( |
|
"attributes" in plugin_data |
|
and "target" in plugin_data["attributes"] |
|
): |
|
instance.target = plugin_data["attributes"]["target"] |
|
|
|
instance.save() |
|
if self.verbosity >= 1: |
|
self.stdout.write(f' ok! ("{name}" -> {link_url})') |
|
imported_count += 1 |
|
|
|
elif plugin_type == "ImagePlugin": |
|
# Image plugin from djangocms-frontend |
|
image_info = plugin_data.get("image", {}) |
|
filename = image_info.get("original_filename", "?") |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
f" - importing image (ID {plugin.pk}, {filename})...", |
|
ending="", |
|
) |
|
image = self._find_or_download_image(plugin_data, post_slug) |
|
if image: |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
f" ok! (image ID {image.pk}: {image.original_filename})" |
|
) |
|
# The Image model uses a config field (JSONField) |
|
# Properties are accessible via __getattr__ which looks in config |
|
# Initialize config if necessary |
|
if not instance.config: |
|
instance.config = {} |
|
|
|
# Set the image (use a dictionary with pk and model for compatibility) |
|
instance.config["picture"] = { |
|
"pk": image.pk, |
|
"model": "filer.image", |
|
} |
|
|
|
# external_picture must be defined (even if empty) to avoid AttributeError |
|
instance.config["external_picture"] = "" |
|
|
|
# Handle size and scaling attributes |
|
# width and height must always be defined (even None) to avoid AttributeError |
|
if "attributes" in plugin_data: |
|
attrs = plugin_data["attributes"] |
|
if "width" in attrs and attrs["width"]: |
|
instance.config["width"] = attrs["width"] |
|
else: |
|
instance.config["width"] = None |
|
if "height" in attrs and attrs["height"]: |
|
instance.config["height"] = attrs["height"] |
|
else: |
|
instance.config["height"] = None |
|
else: |
|
instance.config["width"] = None |
|
instance.config["height"] = None |
|
|
|
# Enable automatic scaling if no size defined |
|
if not instance.config.get("width") and not instance.config.get( |
|
"height" |
|
): |
|
instance.config["use_automatic_scaling"] = True |
|
else: |
|
instance.config["use_automatic_scaling"] = False |
|
|
|
# Enable picture_fluid to make the image responsive |
|
instance.config["picture_fluid"] = True |
|
# Set picture_rounded and picture_thumbnail (even if False by default) |
|
if "picture_rounded" not in instance.config: |
|
instance.config["picture_rounded"] = False |
|
if "picture_thumbnail" not in instance.config: |
|
instance.config["picture_thumbnail"] = False |
|
|
|
# Set other default fields if necessary |
|
if "use_responsive_image" not in instance.config: |
|
instance.config["use_responsive_image"] = "inherit" |
|
if "lazy_loading" not in instance.config: |
|
instance.config["lazy_loading"] = False |
|
if "use_crop" not in instance.config: |
|
instance.config["use_crop"] = False |
|
if "use_upscale" not in instance.config: |
|
instance.config["use_upscale"] = False |
|
if "use_no_cropping" not in instance.config: |
|
instance.config["use_no_cropping"] = False |
|
if "alignment" not in instance.config: |
|
instance.config["alignment"] = "" |
|
if "template" not in instance.config: |
|
instance.config["template"] = "default" |
|
# Additional fields for compatibility |
|
if "attributes" not in instance.config: |
|
instance.config["attributes"] = {} |
|
if "link" not in instance.config: |
|
instance.config["link"] = {} |
|
if "target" not in instance.config: |
|
instance.config["target"] = "" |
|
if "margin_x" not in instance.config: |
|
instance.config["margin_x"] = "" |
|
if "margin_y" not in instance.config: |
|
instance.config["margin_y"] = "" |
|
if "margin_devices" not in instance.config: |
|
instance.config["margin_devices"] = None |
|
if "responsive_visibility" not in instance.config: |
|
instance.config["responsive_visibility"] = None |
|
if "thumbnail_options" not in instance.config: |
|
instance.config["thumbnail_options"] = None |
|
|
|
instance.save() |
|
imported_count += 1 |
|
else: |
|
if self.verbosity >= 1: |
|
self.stdout.write(" ✗ Image not found!") |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
self.style.WARNING( |
|
f"Image not found for plugin {plugin.pk}" |
|
) |
|
) |
|
|
|
# Process children recursively (except for TextPlugin which handles them already) |
|
if ( |
|
plugin_type != "TextPlugin" |
|
and "children" in plugin_data |
|
and plugin_data["children"] |
|
): |
|
child_count = self.import_plugins( |
|
placeholder, |
|
plugin_data["children"], |
|
language, |
|
parent=plugin, |
|
post_slug=post_slug, |
|
) |
|
imported_count += child_count |
|
|
|
return imported_count |
|
|
|
def _create_child_plugin( |
|
self, placeholder, plugin_data, language, parent=None, post_slug=None |
|
): |
|
"""Create a child plugin and return the instance""" |
|
plugin_type = plugin_data.get("plugin_type") |
|
|
|
# Type mapping |
|
plugin_type_mapping = { |
|
"FilerImagePlugin": "ImagePlugin", |
|
"Bootstrap4PicturePlugin": "ImagePlugin", |
|
"Bootstrap4LinkPlugin": "LinkPlugin", |
|
"Bootstrap4Link": "LinkPlugin", |
|
"Bootstrap4Picture": "ImagePlugin", |
|
"PicturePlugin": "ImagePlugin", |
|
} |
|
|
|
if plugin_type in plugin_type_mapping: |
|
plugin_type = plugin_type_mapping[plugin_type] |
|
|
|
try: |
|
if not plugin_pool.get_plugin(plugin_type): |
|
return None |
|
except KeyError: |
|
return None |
|
|
|
try: |
|
plugin = add_plugin( |
|
placeholder=placeholder, |
|
plugin_type=plugin_type, |
|
language=language, |
|
position="last-child", |
|
target=parent, |
|
) |
|
except Exception: |
|
return None |
|
|
|
instance, _ = plugin.get_plugin_instance() |
|
if not instance: |
|
return None |
|
|
|
# Fill data |
|
if plugin_type == "LinkPlugin": |
|
name = plugin_data.get("name") or plugin_data.get("attributes", {}).get( |
|
"name" |
|
) |
|
if name: |
|
instance.name = name |
|
|
|
# PRIORITY: always use "url" if present, do not use fallback with internal_link_id |
|
link_url = plugin_data.get("url") |
|
if link_url is None: |
|
link_url = plugin_data.get("attributes", {}).get("external_link") |
|
|
|
if link_url: |
|
instance.link = link_url |
|
# Do NOT use fallback with internal_link_id because IDs don't match between sites |
|
|
|
instance.save() |
|
if self.verbosity >= 1: |
|
old_id = ( |
|
plugin_data.get("id") |
|
or plugin_data.get("attributes", {}).get("id") |
|
or plugin_data.get("attributes", {}).get("cmsplugin_ptr_id") |
|
or plugin_data.get("pk") |
|
) |
|
self.stdout.write( |
|
f' - importing link id {old_id} -> {plugin.pk} ("{name}" {link_url})... ok' |
|
) |
|
return plugin |
|
|
|
elif plugin_type == "ImagePlugin": |
|
# Image plugin from djangocms-frontend |
|
image_info = plugin_data.get("image", {}) |
|
filename = image_info.get("original_filename", "?") |
|
old_id = ( |
|
plugin_data.get("id") |
|
or plugin_data.get("attributes", {}).get("id") |
|
or plugin_data.get("attributes", {}).get("cmsplugin_ptr_id") |
|
or plugin_data.get("pk") |
|
) |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
f" - importing image id {old_id} ({filename})...", |
|
ending="", |
|
) |
|
image = self._find_or_download_image(plugin_data, post_slug) |
|
if image: |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
f" ok! (image ID {image.pk}: {image.original_filename})" |
|
) |
|
if not instance.config: |
|
instance.config = {} |
|
instance.config["picture"] = {"pk": image.pk, "model": "filer.image"} |
|
instance.config["external_picture"] = "" |
|
instance.config["width"] = None |
|
instance.config["height"] = None |
|
instance.config["use_automatic_scaling"] = True |
|
instance.config["picture_fluid"] = True |
|
instance.config["picture_rounded"] = False |
|
instance.config["picture_thumbnail"] = False |
|
instance.config["use_responsive_image"] = "inherit" |
|
instance.config["lazy_loading"] = False |
|
instance.config["use_crop"] = False |
|
instance.config["use_upscale"] = False |
|
instance.config["use_no_cropping"] = False |
|
instance.config["alignment"] = "" |
|
instance.config["template"] = "default" |
|
instance.config["attributes"] = {} |
|
instance.config["link"] = {} |
|
instance.config["target"] = "" |
|
instance.config["margin_x"] = "" |
|
instance.config["margin_y"] = "" |
|
instance.config["margin_devices"] = None |
|
instance.config["responsive_visibility"] = None |
|
instance.config["thumbnail_options"] = None |
|
instance.save() |
|
return plugin |
|
else: |
|
if self.verbosity >= 1: |
|
self.stdout.write(" ✗ Image not found!") |
|
|
|
return None |
|
|
|
def _replace_cms_plugin_ids(self, body, id_mapping): |
|
"""Replace IDs in <cms-plugin> tags with new IDs""" |
|
if not id_mapping: |
|
return body |
|
|
|
def replace_id(match): |
|
old_id = match.group(1) |
|
new_id = id_mapping.get(old_id) |
|
if new_id: |
|
# Replace the ID in the tag |
|
return match.group(0).replace(f'id="{old_id}"', f'id="{new_id}"') |
|
else: |
|
# If ID is not in mapping, display a warning |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
self.style.WARNING( |
|
f"ID {old_id} not found in mapping. Available mapping: {list(id_mapping.keys())}" |
|
) |
|
) |
|
return match.group(0) |
|
|
|
# Pattern to find <cms-plugin ... id="XXX" ...> |
|
pattern = r'<cms-plugin[^>]*id="(\d+)"[^>]*>' |
|
return re.sub(pattern, replace_id, body) |
|
|
|
def _find_or_download_image(self, plugin_data, post_slug): |
|
"""Find or download an image""" |
|
if not FILER_AVAILABLE or not self.blog_folder: |
|
return None |
|
|
|
image = None |
|
|
|
# Method 1: Directly exported data |
|
# IMPORTANT: Do not use filer_id first because IDs don't match between sites |
|
# Prioritize file_path and original_filename which are more reliable |
|
if "image" in plugin_data: |
|
image_info = plugin_data["image"] |
|
file_path = image_info.get("file_path") |
|
original_filename = image_info.get("original_filename") |
|
filer_id = image_info.get("filer_id") |
|
|
|
if self.verbosity >= 2: |
|
self.stdout.write(" find_or_download_image:") |
|
self.stdout.write(f" file_path: {file_path}") |
|
self.stdout.write(f" original_filename: {original_filename}") |
|
self.stdout.write(f" filer_id: {filer_id}") |
|
|
|
# Priority 1: Search by file_path (most reliable) |
|
if file_path: |
|
# Try to find by full path or filename |
|
filename_from_path = file_path.split("/")[-1] |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" Search by filename_from_path: {filename_from_path}" |
|
) |
|
# Search by filename in the path |
|
image = FilerImage.objects.filter( |
|
file__icontains=filename_from_path |
|
).first() |
|
if image: |
|
# Verify that the file really exists |
|
file_exists = False |
|
check_method = None |
|
if hasattr(image.file, "storage") and hasattr( |
|
image.file.storage, "exists" |
|
): |
|
file_exists = image.file.storage.exists(image.file.name) |
|
check_method = "storage.exists" |
|
elif hasattr(image.file, "path"): |
|
# Standard Library |
|
import os |
|
|
|
file_exists = os.path.exists(image.file.path) |
|
check_method = "os.path.exists" |
|
|
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" File existence check (method: {check_method}): {file_exists}" |
|
) |
|
|
|
if file_exists: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" ✓ Found by filename! (ID {image.pk}: {image.file.name})" |
|
) |
|
return image |
|
else: |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
self.style.WARNING( |
|
f" ✗ Image found (ID {image.pk}) but file does not exist: {image.file.name}" |
|
) |
|
) |
|
# Don't return this image, continue to download |
|
image = None |
|
# Also search by partial path (without filer_public prefix) |
|
if not image: |
|
path_parts = file_path.split("/") |
|
if len(path_parts) >= 3: |
|
# Take the last 2 segments (e.g.: ed/63/ed636ad9...) |
|
search_path = "/".join(path_parts[-2:]) |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" Search by partial path: {search_path}" |
|
) |
|
candidate = FilerImage.objects.filter( |
|
file__icontains=search_path |
|
).first() |
|
if candidate: |
|
# Verify that the file really exists |
|
file_exists = False |
|
if hasattr(candidate.file, "storage") and hasattr( |
|
candidate.file.storage, "exists" |
|
): |
|
file_exists = candidate.file.storage.exists( |
|
candidate.file.name |
|
) |
|
elif hasattr(candidate.file, "path"): |
|
# Standard Library |
|
import os |
|
|
|
file_exists = os.path.exists(candidate.file.path) |
|
|
|
if file_exists: |
|
image = candidate |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" ✓ Found by partial path! (ID {image.pk}: {image.file.name})" |
|
) |
|
return image |
|
else: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" ✗ Image found (ID {candidate.pk}) but file does not exist: {candidate.file.name}" |
|
) |
|
|
|
# Priority 2: Search by original_filename |
|
if not image and original_filename: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" Search by original_filename: {original_filename}" |
|
) |
|
candidate = FilerImage.objects.filter( |
|
original_filename=original_filename |
|
).first() |
|
if candidate: |
|
# Verify that the file really exists |
|
file_exists = False |
|
if hasattr(candidate.file, "storage") and hasattr( |
|
candidate.file.storage, "exists" |
|
): |
|
file_exists = candidate.file.storage.exists(candidate.file.name) |
|
elif hasattr(candidate.file, "path"): |
|
# Standard Library |
|
import os |
|
|
|
file_exists = os.path.exists(candidate.file.path) |
|
|
|
if file_exists: |
|
image = candidate |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" ✓ Found by original_filename! (ID {image.pk}: {image.file.name})" |
|
) |
|
return image |
|
else: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" ✗ Image found (ID {candidate.pk}) but file does not exist: {candidate.file.name}" |
|
) |
|
|
|
# Priority 3: Use filer_id only as last resort |
|
# (but verify that the found image matches file_path or original_filename) |
|
if not image and filer_id: |
|
if self.verbosity >= 2: |
|
self.stdout.write(f" Search by filer_id: {filer_id}") |
|
try: |
|
candidate_image = FilerImage.objects.get(pk=filer_id) |
|
# Verify that it's the right image |
|
matches = False |
|
if file_path: |
|
filename_from_path = file_path.split("/")[-1] |
|
if ( |
|
filename_from_path.lower() |
|
in candidate_image.file.name.lower() |
|
): |
|
matches = True |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" ✓ Found by filer_id and verified by file_path! (ID {candidate_image.pk}: {candidate_image.file.name})" |
|
) |
|
if not matches and original_filename: |
|
if candidate_image.original_filename == original_filename: |
|
matches = True |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" ✓ Found by filer_id and verified by original_filename! (ID {candidate_image.pk}: {candidate_image.original_filename})" |
|
) |
|
|
|
if matches: |
|
return candidate_image |
|
|
|
# If no match, don't use this image and continue to download |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" ✗ Image ID {filer_id} found but does not match (file: {candidate_image.file.name}, original: {candidate_image.original_filename})" |
|
) |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
" → Continuing to download the correct image" |
|
) |
|
# Don't return the image, continue to download |
|
except FilerImage.DoesNotExist: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" ✗ Image ID {filer_id} not found" |
|
) |
|
pass |
|
|
|
# Method 2: For converted Bootstrap4PicturePlugin |
|
# DO NOT use this method because picture_id corresponds to filer_id which doesn't match between sites |
|
# This method is disabled to avoid using the wrong image |
|
# if not image and "attributes" in plugin_data: |
|
# attrs = plugin_data["attributes"] |
|
# picture_id = attrs.get("picture_id") |
|
# if picture_id: |
|
# try: |
|
# image = FilerImage.objects.get(pk=picture_id) |
|
# return image |
|
# except FilerImage.DoesNotExist: |
|
# pass |
|
|
|
# Method 3: Download from media server if we have a file_path |
|
if not image and "image" in plugin_data: |
|
image_info = plugin_data["image"] |
|
file_path = image_info.get("file_path") |
|
original_filename = image_info.get("original_filename") |
|
|
|
if file_path: |
|
media_url = f"{self.site_url}/media/{file_path.lstrip('/')}" |
|
if self.verbosity >= 2: |
|
self.stdout.write(f" Downloading from: {media_url}") |
|
image = self._download_image_from_url( |
|
media_url, original_filename or file_path.split("/")[-1] |
|
) |
|
if image: |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
f" ✓ Image downloaded! (ID {image.pk}: {image.original_filename})" |
|
) |
|
if self.verbosity >= 2: |
|
# Verify that the file exists after download |
|
file_exists = False |
|
if hasattr(image.file, "storage") and hasattr( |
|
image.file.storage, "exists" |
|
): |
|
file_exists = image.file.storage.exists(image.file.name) |
|
elif hasattr(image.file, "path"): |
|
# Standard Library |
|
import os |
|
|
|
file_exists = os.path.exists(image.file.path) |
|
if file_exists: |
|
self.stdout.write( |
|
f" ✓ File verified after download: {image.file.name}" |
|
) |
|
else: |
|
self.stdout.write( |
|
self.style.ERROR( |
|
f" ✗ WARNING: File downloaded but not found: {image.file.name}" |
|
) |
|
) |
|
else: |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
self.style.ERROR(" ✗ Download failed") |
|
) |
|
else: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
" ✗ No file_path, cannot download" |
|
) |
|
|
|
if not image and self.verbosity >= 2: |
|
self.stdout.write( |
|
" ✗ Image not found after all attempts" |
|
) |
|
|
|
return image |
|
|
|
def _download_image_from_url(self, url, filename): |
|
"""Download an image from a URL""" |
|
if not FILER_AVAILABLE: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
self.style.WARNING( |
|
"FILER_AVAILABLE is False, cannot download" |
|
) |
|
) |
|
return None |
|
|
|
if not self.blog_folder: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
self.style.WARNING( |
|
"blog_folder is not defined, cannot download" |
|
) |
|
) |
|
return None |
|
|
|
try: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" Starting download from {url}..." |
|
) |
|
response = requests.get(url, timeout=30) |
|
response.raise_for_status() |
|
|
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" Response received: {len(response.content)} bytes" |
|
) |
|
|
|
# Verify that it's really an image |
|
content_type = response.headers.get("content-type", "") |
|
if self.verbosity >= 2: |
|
self.stdout.write(f" Content-Type: {content_type}") |
|
if not content_type.startswith("image/"): |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
self.style.WARNING( |
|
f"Content is not an image (content-type: {content_type})" |
|
) |
|
) |
|
return None |
|
|
|
# Create the file in filer |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" Creating ContentFile with name: {filename}" |
|
) |
|
image_file = ContentFile(response.content, name=filename) |
|
|
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" Creating FilerImage in folder: {self.blog_folder.name} (ID: {self.blog_folder.pk})" |
|
) |
|
image = FilerImage.objects.create( |
|
folder=self.blog_folder, |
|
file=image_file, |
|
original_filename=filename, |
|
) |
|
|
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" FilerImage created successfully (ID: {image.pk}, file.name: {image.file.name})" |
|
) |
|
|
|
# Verify that the file really exists |
|
if hasattr(image.file, "storage") and hasattr(image.file.storage, "exists"): |
|
if not image.file.storage.exists(image.file.name): |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
self.style.ERROR( |
|
f"WARNING: File {image.file.name} does not exist in storage!" |
|
) |
|
) |
|
else: |
|
if self.verbosity >= 2: |
|
self.stdout.write( |
|
f" ✓ File verified in storage: {image.file.name}" |
|
) |
|
|
|
return image |
|
except requests.exceptions.RequestException as e: |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
self.style.ERROR( |
|
f" ✗ HTTP error downloading {url}: {e}" |
|
) |
|
) |
|
return None |
|
except Exception as e: |
|
if self.verbosity >= 1: |
|
self.stdout.write( |
|
self.style.ERROR( |
|
f" ✗ Error downloading {url}: {type(e).__name__}: {e}" |
|
) |
|
) |
|
# Standard Library |
|
import traceback |
|
|
|
if self.verbosity >= 2: |
|
self.stdout.write(traceback.format_exc()) |
|
return None |