Providers Implementation Specification
Overview
This document defines all provider implementations needed for video, audio, music, and storage services. Each provider implements the abstract interface from core/providers/base.py.
Directory Structure
core/providers/
├── __init__.py
├── base.py # Abstract interfaces (exists)
│
├── video/
│ ├── __init__.py
│ ├── runway.py # Runway Gen-3 Alpha
│ ├── pika.py # Pika Labs
│ ├── stability.py # Stability AI Video
│ ├── luma.py # Luma AI Dream Machine
│ └── kling.py # Kling AI
│
├── audio/
│ ├── __init__.py
│ ├── elevenlabs.py # ElevenLabs TTS
│ ├── openai_tts.py # OpenAI TTS
│ └── google_tts.py # Google Cloud TTS
│
├── music/
│ ├── __init__.py
│ ├── mubert.py # Mubert AI music
│ ├── soundraw.py # Soundraw AI music
│ └── suno.py # Suno AI music
│
├── sfx/
│ ├── __init__.py
│ ├── freesound.py # Freesound (free)
│ └── elevenlabs_sfx.py # ElevenLabs sound effects
│
├── image/
│ ├── __init__.py
│ ├── dalle.py # OpenAI DALL-E 3
│ ├── midjourney.py # Midjourney (via API proxy)
│ └── stability_image.py # Stability AI images
│
└── storage/
├── __init__.py
├── local.py # Local filesystem
└── s3.py # AWS S3
Video Providers
runway.py - Runway Gen-3 Alpha
"""Runway Gen-3 Alpha video generation provider"""
import os
import asyncio
import httpx
from typing import Dict, Any
from ..base import VideoProvider, GeneratedVideo
class RunwayProvider(VideoProvider):
"""
Runway Gen-3 Alpha - High quality photorealistic video
Features:
- Text-to-video and image-to-video
- 5-10 second clips
- 720p and 1080p output
- Motion brush for controlled animation
Pricing (as of 2025):
- Gen-3 Alpha: ~$0.50/second
- Gen-3 Alpha Turbo: ~$0.25/second
"""
BASE_URL = "https://api.runwayml.com/v1"
def __init__(self, model: str = "gen3a_turbo"):
"""
Args:
model: "gen3a_turbo" (fast, cheaper) or "gen3a" (higher quality)
"""
self.api_key = os.getenv("RUNWAY_API_KEY")
self.model = model
self._cost_per_second = 0.25 if "turbo" in model else 0.50
if not self.api_key:
raise ValueError("RUNWAY_API_KEY environment variable required")
@property
def name(self) -> str:
return "runway"
@property
def cost_per_second(self) -> float:
return self._cost_per_second
async def generate(
self,
prompt: str,
duration: float,
width: int = 1280,
height: int = 768,
**kwargs
) -> GeneratedVideo:
"""Generate video from text prompt"""
# Runway max duration is 10 seconds
duration = min(duration, 10.0)
async with httpx.AsyncClient(timeout=300.0) as client:
# Start generation job
response = await client.post(
f"{self.BASE_URL}/generations",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": self.model,
"prompt": prompt,
"duration": int(duration),
"width": width,
"height": height,
"seed": kwargs.get("seed"),
}
)
response.raise_for_status()
job_data = response.json()
job_id = job_data["id"]
# Poll for completion
video_url = await self._wait_for_completion(client, job_id)
return GeneratedVideo(
video_id=job_id,
video_url=video_url,
duration=duration,
width=width,
height=height,
format="mp4",
generation_cost=duration * self._cost_per_second,
provider=self.name,
metadata={"model": self.model, "prompt": prompt[:200]}
)
async def _wait_for_completion(
self,
client: httpx.AsyncClient,
job_id: str,
max_wait: int = 300,
poll_interval: int = 5
) -> str:
"""Poll until generation completes"""
elapsed = 0
while elapsed < max_wait:
status = await self.check_status(job_id)
if status["status"] == "completed":
return status["output_url"]
elif status["status"] == "failed":
raise Exception(f"Generation failed: {status.get('error')}")
await asyncio.sleep(poll_interval)
elapsed += poll_interval
raise TimeoutError(f"Generation timed out after {max_wait}s")
async def check_status(self, job_id: str) -> Dict[str, Any]:
"""Check generation job status"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/generations/{job_id}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
response.raise_for_status()
return response.json()
async def download(self, video_url: str, local_path: str) -> str:
"""Download video to local path"""
async with httpx.AsyncClient() as client:
response = await client.get(video_url)
response.raise_for_status()
with open(local_path, "wb") as f:
f.write(response.content)
return local_path
pika.py - Pika Labs
"""Pika Labs video generation provider"""
import os
import asyncio
import httpx
from typing import Dict, Any
from ..base import VideoProvider, GeneratedVideo
class PikaProvider(VideoProvider):
"""
Pika Labs - Stylized and animated video generation
Features:
- Text-to-video and image-to-video
- 3-4 second clips
- Great for stylized/animated content
- Lip sync capabilities
Pricing (as of 2025):
- ~$0.20/second
"""
BASE_URL = "https://api.pika.art/v1"
def __init__(self):
self.api_key = os.getenv("PIKA_API_KEY")
if not self.api_key:
raise ValueError("PIKA_API_KEY environment variable required")
@property
def name(self) -> str:
return "pika"
@property
def cost_per_second(self) -> float:
return 0.20
async def generate(
self,
prompt: str,
duration: float,
width: int = 1024,
height: int = 576,
**kwargs
) -> GeneratedVideo:
"""Generate video from text prompt"""
# Pika max duration is 4 seconds
duration = min(duration, 4.0)
async with httpx.AsyncClient(timeout=180.0) as client:
response = await client.post(
f"{self.BASE_URL}/generate",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"prompt": prompt,
"duration": duration,
"width": width,
"height": height,
"style": kwargs.get("style", "default"),
}
)
response.raise_for_status()
job_data = response.json()
job_id = job_data["id"]
# Poll for completion
video_url = await self._wait_for_completion(client, job_id)
return GeneratedVideo(
video_id=job_id,
video_url=video_url,
duration=duration,
width=width,
height=height,
format="mp4",
generation_cost=duration * self.cost_per_second,
provider=self.name,
metadata={"prompt": prompt[:200]}
)
async def _wait_for_completion(self, client, job_id: str) -> str:
"""Poll until generation completes"""
for _ in range(60): # Max 5 minutes
status = await self.check_status(job_id)
if status["status"] == "completed":
return status["video_url"]
elif status["status"] == "failed":
raise Exception(f"Generation failed: {status.get('error')}")
await asyncio.sleep(5)
raise TimeoutError("Generation timed out")
async def check_status(self, job_id: str) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/status/{job_id}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
return response.json()
async def download(self, video_url: str, local_path: str) -> str:
async with httpx.AsyncClient() as client:
response = await client.get(video_url)
with open(local_path, "wb") as f:
f.write(response.content)
return local_path
stability.py - Stability AI Video
"""Stability AI video generation provider"""
import os
import httpx
from ..base import VideoProvider, GeneratedVideo
class StabilityVideoProvider(VideoProvider):
"""
Stability AI - Stable Video Diffusion
Features:
- Image-to-video (requires input image)
- 2-4 second clips
- Good for motion graphics and abstract
Pricing (as of 2025):
- ~$0.10/second
"""
BASE_URL = "https://api.stability.ai/v2beta"
def __init__(self):
self.api_key = os.getenv("STABILITY_API_KEY")
if not self.api_key:
raise ValueError("STABILITY_API_KEY environment variable required")
@property
def name(self) -> str:
return "stability"
@property
def cost_per_second(self) -> float:
return 0.10
async def generate(
self,
prompt: str,
duration: float,
width: int = 1024,
height: int = 576,
**kwargs
) -> GeneratedVideo:
"""Generate video - requires input_image in kwargs"""
input_image = kwargs.get("input_image")
if not input_image:
raise ValueError("Stability Video requires input_image")
duration = min(duration, 4.0)
async with httpx.AsyncClient(timeout=180.0) as client:
# Stability uses multipart form data
files = {"image": open(input_image, "rb")}
data = {
"seed": kwargs.get("seed", 0),
"cfg_scale": kwargs.get("cfg_scale", 2.5),
"motion_bucket_id": kwargs.get("motion_bucket_id", 40),
}
response = await client.post(
f"{self.BASE_URL}/image-to-video",
headers={"Authorization": f"Bearer {self.api_key}"},
files=files,
data=data
)
response.raise_for_status()
result = response.json()
return GeneratedVideo(
video_id=result["id"],
video_url=result["video_url"],
duration=duration,
width=width,
height=height,
format="mp4",
generation_cost=duration * self.cost_per_second,
provider=self.name
)
async def check_status(self, job_id: str):
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/results/{job_id}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
return response.json()
async def download(self, video_url: str, local_path: str) -> str:
async with httpx.AsyncClient() as client:
response = await client.get(video_url)
with open(local_path, "wb") as f:
f.write(response.content)
return local_path
luma.py - Luma AI Dream Machine
"""Luma AI Dream Machine video generation provider"""
import os
import httpx
from ..base import VideoProvider, GeneratedVideo
class LumaProvider(VideoProvider):
"""
Luma AI Dream Machine - High quality video generation
Features:
- Text-to-video and image-to-video
- 5 second clips
- Excellent camera motion control
- Good for cinematic shots
Pricing (as of 2025):
- ~$0.30/second
"""
BASE_URL = "https://api.lumalabs.ai/v1"
def __init__(self):
self.api_key = os.getenv("LUMA_API_KEY")
if not self.api_key:
raise ValueError("LUMA_API_KEY environment variable required")
@property
def name(self) -> str:
return "luma"
@property
def cost_per_second(self) -> float:
return 0.30
async def generate(
self,
prompt: str,
duration: float,
width: int = 1280,
height: int = 720,
**kwargs
) -> GeneratedVideo:
"""Generate video from text prompt"""
duration = min(duration, 5.0)
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
f"{self.BASE_URL}/generations",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"prompt": prompt,
"aspect_ratio": f"{width}:{height}",
"loop": kwargs.get("loop", False),
"keyframes": kwargs.get("keyframes"), # For camera control
}
)
response.raise_for_status()
result = response.json()
# Poll for completion
video_url = await self._wait_for_completion(client, result["id"])
return GeneratedVideo(
video_id=result["id"],
video_url=video_url,
duration=duration,
width=width,
height=height,
format="mp4",
generation_cost=duration * self.cost_per_second,
provider=self.name
)
async def _wait_for_completion(self, client, job_id: str) -> str:
import asyncio
for _ in range(60):
status = await self.check_status(job_id)
if status["state"] == "completed":
return status["video"]["url"]
elif status["state"] == "failed":
raise Exception(f"Generation failed")
await asyncio.sleep(5)
raise TimeoutError("Generation timed out")
async def check_status(self, job_id: str):
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/generations/{job_id}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
return response.json()
async def download(self, video_url: str, local_path: str) -> str:
async with httpx.AsyncClient() as client:
response = await client.get(video_url)
with open(local_path, "wb") as f:
f.write(response.content)
return local_path
kling.py - Kling AI
"""Kling AI video generation provider"""
import os
import httpx
from ..base import VideoProvider, GeneratedVideo
class KlingProvider(VideoProvider):
"""
Kling AI - High quality Chinese video model
Features:
- Text-to-video and image-to-video
- Up to 10 second clips
- Good motion and physics
Pricing (as of 2025):
- ~$0.15/second (Standard)
- ~$0.30/second (Pro)
"""
BASE_URL = "https://api.klingai.com/v1"
def __init__(self, mode: str = "standard"):
self.api_key = os.getenv("KLING_API_KEY")
self.mode = mode
self._cost = 0.15 if mode == "standard" else 0.30
if not self.api_key:
raise ValueError("KLING_API_KEY environment variable required")
@property
def name(self) -> str:
return "kling"
@property
def cost_per_second(self) -> float:
return self._cost
async def generate(
self,
prompt: str,
duration: float,
width: int = 1280,
height: int = 720,
**kwargs
) -> GeneratedVideo:
"""Generate video from text prompt"""
duration = min(duration, 10.0)
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
f"{self.BASE_URL}/video/generate",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"prompt": prompt,
"duration": duration,
"mode": self.mode,
}
)
response.raise_for_status()
result = response.json()
return GeneratedVideo(
video_id=result["task_id"],
video_url=result["video_url"],
duration=duration,
width=width,
height=height,
format="mp4",
generation_cost=duration * self._cost,
provider=self.name
)
async def check_status(self, job_id: str):
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/video/status/{job_id}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
return response.json()
async def download(self, video_url: str, local_path: str) -> str:
async with httpx.AsyncClient() as client:
response = await client.get(video_url)
with open(local_path, "wb") as f:
f.write(response.content)
return local_path
Audio Providers
elevenlabs.py - ElevenLabs TTS
"""ElevenLabs text-to-speech provider"""
import os
import httpx
from typing import List, Dict, Any
from ..base import AudioProvider, GeneratedAudio
class ElevenLabsProvider(AudioProvider):
"""
ElevenLabs - Premium AI voice synthesis
Features:
- Ultra-realistic voices
- Voice cloning
- Multilingual support
- Emotion control
Pricing (as of 2025):
- ~$0.30 per 1K characters
"""
BASE_URL = "https://api.elevenlabs.io/v1"
# Default voice IDs
VOICES = {
"professional_male": "pNInz6obpgDQGcFmaJgB", # Adam
"professional_female": "21m00Tcm4TlvDq8ikWAM", # Rachel
"casual_male": "TxGEqnHWrfWFTfGW9XjX", # Josh
"casual_female": "EXAVITQu4vr4xnSDxMaL", # Sarah
"narrator": "VR6AewLTigWG4xSOukaG", # Arnold
}
def __init__(self, voice_id: str = None):
self.api_key = os.getenv("ELEVENLABS_API_KEY")
self.default_voice = voice_id or self.VOICES["professional_male"]
if not self.api_key:
raise ValueError("ELEVENLABS_API_KEY environment variable required")
@property
def name(self) -> str:
return "elevenlabs"
async def generate_speech(
self,
text: str,
voice_id: str = None,
speed: float = 1.0,
**kwargs
) -> GeneratedAudio:
"""Generate speech from text"""
voice_id = voice_id or self.default_voice
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{self.BASE_URL}/text-to-speech/{voice_id}",
headers={
"xi-api-key": self.api_key,
"Content-Type": "application/json"
},
json={
"text": text,
"model_id": kwargs.get("model_id", "eleven_multilingual_v2"),
"voice_settings": {
"stability": kwargs.get("stability", 0.5),
"similarity_boost": kwargs.get("similarity_boost", 0.75),
"style": kwargs.get("style", 0.5),
"use_speaker_boost": kwargs.get("speaker_boost", True)
}
}
)
response.raise_for_status()
# Response is audio bytes
audio_bytes = response.content
# Estimate duration (~150 words per minute)
word_count = len(text.split())
estimated_duration = (word_count / 150) * 60 / speed
# Calculate cost (~$0.30 per 1K chars)
cost = (len(text) / 1000) * 0.30
# For real implementation, save to file/storage and return URL
audio_id = f"el_{hash(text) % 100000}"
return GeneratedAudio(
audio_id=audio_id,
audio_url=f"file://{audio_id}.mp3", # Would be real URL
duration=estimated_duration,
format="mp3",
sample_rate=44100,
channels=1,
generation_cost=cost,
provider=self.name,
metadata={
"voice_id": voice_id,
"char_count": len(text),
"word_count": word_count
}
)
async def list_voices(self) -> List[Dict[str, Any]]:
"""List available voices"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/voices",
headers={"xi-api-key": self.api_key}
)
response.raise_for_status()
return response.json()["voices"]
openai_tts.py - OpenAI TTS
"""OpenAI text-to-speech provider"""
import os
import httpx
from typing import List, Dict, Any
from ..base import AudioProvider, GeneratedAudio
class OpenAITTSProvider(AudioProvider):
"""
OpenAI TTS - Fast, affordable voice synthesis
Features:
- 6 built-in voices
- Fast generation
- Good quality for the price
Pricing (as of 2025):
- TTS-1: ~$0.015 per 1K characters
- TTS-1-HD: ~$0.030 per 1K characters
"""
BASE_URL = "https://api.openai.com/v1"
VOICES = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
def __init__(self, model: str = "tts-1"):
self.api_key = os.getenv("OPENAI_API_KEY")
self.model = model
self._cost_per_1k = 0.015 if model == "tts-1" else 0.030
if not self.api_key:
raise ValueError("OPENAI_API_KEY environment variable required")
@property
def name(self) -> str:
return "openai_tts"
async def generate_speech(
self,
text: str,
voice_id: str = "alloy",
speed: float = 1.0,
**kwargs
) -> GeneratedAudio:
"""Generate speech from text"""
if voice_id not in self.VOICES:
voice_id = "alloy"
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{self.BASE_URL}/audio/speech",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": self.model,
"input": text,
"voice": voice_id,
"speed": speed,
"response_format": kwargs.get("format", "mp3")
}
)
response.raise_for_status()
audio_bytes = response.content
word_count = len(text.split())
estimated_duration = (word_count / 150) * 60 / speed
cost = (len(text) / 1000) * self._cost_per_1k
audio_id = f"oai_{hash(text) % 100000}"
return GeneratedAudio(
audio_id=audio_id,
audio_url=f"file://{audio_id}.mp3",
duration=estimated_duration,
format="mp3",
sample_rate=24000,
channels=1,
generation_cost=cost,
provider=self.name,
metadata={"voice": voice_id, "model": self.model}
)
async def list_voices(self) -> List[Dict[str, Any]]:
return [{"id": v, "name": v.title()} for v in self.VOICES]
google_tts.py - Google Cloud TTS
"""Google Cloud text-to-speech provider"""
import os
from typing import List, Dict, Any
from ..base import AudioProvider, GeneratedAudio
class GoogleTTSProvider(AudioProvider):
"""
Google Cloud TTS - Affordable, wide language support
Features:
- 200+ voices
- 40+ languages
- WaveNet and Neural2 options
Pricing (as of 2025):
- Standard: $4 per 1M characters
- WaveNet: $16 per 1M characters
- Neural2: $16 per 1M characters
"""
def __init__(self, voice_type: str = "Neural2"):
self.voice_type = voice_type
# Pricing per 1K chars
if voice_type == "Standard":
self._cost_per_1k = 0.004
else:
self._cost_per_1k = 0.016
@property
def name(self) -> str:
return "google_tts"
async def generate_speech(
self,
text: str,
voice_id: str = "en-US-Neural2-D",
speed: float = 1.0,
**kwargs
) -> GeneratedAudio:
"""Generate speech from text using Google Cloud TTS"""
# Would use google-cloud-texttospeech library
# from google.cloud import texttospeech
word_count = len(text.split())
estimated_duration = (word_count / 150) * 60 / speed
cost = (len(text) / 1000) * self._cost_per_1k
audio_id = f"gcp_{hash(text) % 100000}"
return GeneratedAudio(
audio_id=audio_id,
audio_url=f"file://{audio_id}.mp3",
duration=estimated_duration,
format="mp3",
sample_rate=24000,
channels=1,
generation_cost=cost,
provider=self.name,
metadata={"voice_id": voice_id, "type": self.voice_type}
)
async def list_voices(self) -> List[Dict[str, Any]]:
# Would call Google API
return [
{"id": "en-US-Neural2-D", "name": "US Male", "language": "en-US"},
{"id": "en-US-Neural2-F", "name": "US Female", "language": "en-US"},
{"id": "en-GB-Neural2-A", "name": "UK Female", "language": "en-GB"},
]
Music Providers
mubert.py - Mubert AI Music
"""Mubert AI music generation provider"""
import os
import httpx
from ..base import MusicProvider, GeneratedAudio
class MubertProvider(MusicProvider):
"""
Mubert - AI-generated royalty-free music
Features:
- Infinite unique tracks
- Genre and mood control
- Commercial license included
Pricing (as of 2025):
- ~$0.50 per track
"""
BASE_URL = "https://api.mubert.com/v2"
MOODS = ["upbeat", "calm", "energetic", "melancholic", "epic", "ambient"]
GENRES = ["electronic", "acoustic", "orchestral", "rock", "jazz", "ambient"]
def __init__(self):
self.api_key = os.getenv("MUBERT_API_KEY")
if not self.api_key:
raise ValueError("MUBERT_API_KEY environment variable required")
@property
def name(self) -> str:
return "mubert"
async def generate(
self,
mood: str,
duration: float,
tempo: str = "medium",
**kwargs
) -> GeneratedAudio:
"""Generate background music"""
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"{self.BASE_URL}/generate",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"duration": duration,
"mood": mood,
"tempo": tempo,
"genre": kwargs.get("genre", "electronic"),
"intensity": kwargs.get("intensity", 0.5),
}
)
response.raise_for_status()
result = response.json()
return GeneratedAudio(
audio_id=result["track_id"],
audio_url=result["url"],
duration=duration,
format="mp3",
sample_rate=44100,
channels=2,
generation_cost=0.50,
provider=self.name,
metadata={"mood": mood, "tempo": tempo}
)
suno.py - Suno AI Music
"""Suno AI music generation provider"""
import os
import httpx
from ..base import MusicProvider, GeneratedAudio
class SunoProvider(MusicProvider):
"""
Suno - AI music with vocals and lyrics
Features:
- Full songs with vocals
- Custom lyrics support
- Multiple genres
Pricing (as of 2025):
- ~$0.05 per second
"""
BASE_URL = "https://api.suno.ai/v1"
def __init__(self):
self.api_key = os.getenv("SUNO_API_KEY")
if not self.api_key:
raise ValueError("SUNO_API_KEY environment variable required")
@property
def name(self) -> str:
return "suno"
async def generate(
self,
mood: str,
duration: float,
tempo: str = "medium",
**kwargs
) -> GeneratedAudio:
"""Generate music (optionally with vocals)"""
async with httpx.AsyncClient(timeout=180.0) as client:
response = await client.post(
f"{self.BASE_URL}/generate",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"prompt": kwargs.get("prompt", f"{mood} {tempo} instrumental"),
"duration": duration,
"instrumental": kwargs.get("instrumental", True),
"lyrics": kwargs.get("lyrics"),
}
)
response.raise_for_status()
result = response.json()
return GeneratedAudio(
audio_id=result["id"],
audio_url=result["audio_url"],
duration=duration,
format="mp3",
sample_rate=44100,
channels=2,
generation_cost=duration * 0.05,
provider=self.name,
metadata={"mood": mood, "has_vocals": not kwargs.get("instrumental", True)}
)
Image Providers
dalle.py - OpenAI DALL-E 3
"""OpenAI DALL-E 3 image generation provider"""
import os
import httpx
from dataclasses import dataclass
from typing import Optional
@dataclass
class GeneratedImage:
image_id: str
image_url: str
width: int
height: int
format: str
generation_cost: float
provider: str
revised_prompt: Optional[str] = None
class DalleProvider:
"""
DALL-E 3 - High quality image generation
Pricing (as of 2025):
- 1024x1024: $0.04
- 1024x1792 / 1792x1024: $0.08
"""
BASE_URL = "https://api.openai.com/v1"
SIZES = {
"square": "1024x1024",
"portrait": "1024x1792",
"landscape": "1792x1024"
}
COSTS = {
"1024x1024": 0.04,
"1024x1792": 0.08,
"1792x1024": 0.08
}
def __init__(self):
self.api_key = os.getenv("OPENAI_API_KEY")
if not self.api_key:
raise ValueError("OPENAI_API_KEY environment variable required")
@property
def name(self) -> str:
return "dalle"
async def generate(
self,
prompt: str,
size: str = "landscape",
**kwargs
) -> GeneratedImage:
"""Generate image from text prompt"""
size_str = self.SIZES.get(size, size)
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{self.BASE_URL}/images/generations",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": "dall-e-3",
"prompt": prompt,
"size": size_str,
"quality": kwargs.get("quality", "standard"),
"n": 1
}
)
response.raise_for_status()
result = response.json()
image_data = result["data"][0]
width, height = map(int, size_str.split("x"))
return GeneratedImage(
image_id=f"dalle_{hash(prompt) % 100000}",
image_url=image_data["url"],
width=width,
height=height,
format="png",
generation_cost=self.COSTS[size_str],
provider=self.name,
revised_prompt=image_data.get("revised_prompt")
)
Storage Providers
local.py - Local Filesystem
"""Local filesystem storage provider"""
import os
import shutil
import aiofiles
from pathlib import Path
from ..base import StorageProvider, StoredFile
class LocalStorageProvider(StorageProvider):
"""Local filesystem storage for development"""
def __init__(self, base_path: str = "./artifacts"):
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
@property
def name(self) -> str:
return "local"
async def upload(self, local_path: str, remote_path: str) -> StoredFile:
dest = self.base_path / remote_path
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(local_path, dest)
stat = dest.stat()
return StoredFile(
file_id=remote_path,
url=f"file://{dest.absolute()}",
size_bytes=stat.st_size,
content_type=self._guess_content_type(remote_path)
)
async def download(self, remote_path: str, local_path: str) -> str:
src = self.base_path / remote_path
shutil.copy2(src, local_path)
return local_path
async def get_url(self, remote_path: str, expires_in: int = 3600) -> str:
return f"file://{(self.base_path / remote_path).absolute()}"
def _guess_content_type(self, path: str) -> str:
ext = Path(path).suffix.lower()
types = {
".mp4": "video/mp4",
".mp3": "audio/mpeg",
".wav": "audio/wav",
".png": "image/png",
".jpg": "image/jpeg",
".json": "application/json",
}
return types.get(ext, "application/octet-stream")
s3.py - AWS S3
"""AWS S3 storage provider"""
import os
from pathlib import Path
from ..base import StorageProvider, StoredFile
class S3StorageProvider(StorageProvider):
"""AWS S3 storage for production"""
def __init__(self, bucket: str, prefix: str = ""):
self.bucket = bucket
self.prefix = prefix
# Would use boto3
# import boto3
# self.client = boto3.client('s3')
@property
def name(self) -> str:
return "s3"
async def upload(self, local_path: str, remote_path: str) -> StoredFile:
key = f"{self.prefix}/{remote_path}" if self.prefix else remote_path
# self.client.upload_file(local_path, self.bucket, key)
size = Path(local_path).stat().st_size
return StoredFile(
file_id=key,
url=f"s3://{self.bucket}/{key}",
size_bytes=size,
content_type=self._guess_content_type(remote_path)
)
async def download(self, remote_path: str, local_path: str) -> str:
key = f"{self.prefix}/{remote_path}" if self.prefix else remote_path
# self.client.download_file(self.bucket, key, local_path)
return local_path
async def get_url(self, remote_path: str, expires_in: int = 3600) -> str:
key = f"{self.prefix}/{remote_path}" if self.prefix else remote_path
# return self.client.generate_presigned_url(
# 'get_object',
# Params={'Bucket': self.bucket, 'Key': key},
# ExpiresIn=expires_in
# )
return f"https://{self.bucket}.s3.amazonaws.com/{key}"
def _guess_content_type(self, path: str) -> str:
ext = Path(path).suffix.lower()
types = {".mp4": "video/mp4", ".mp3": "audio/mpeg", ".png": "image/png"}
return types.get(ext, "application/octet-stream")
Provider Registry Update
# core/providers/__init__.py
from .base import (
VideoProvider, AudioProvider, MusicProvider, StorageProvider,
GeneratedVideo, GeneratedAudio, StoredFile,
ProviderRegistry
)
# Video
from .video.runway import RunwayProvider
from .video.pika import PikaProvider
from .video.stability import StabilityVideoProvider
from .video.luma import LumaProvider
from .video.kling import KlingProvider
# Audio
from .audio.elevenlabs import ElevenLabsProvider
from .audio.openai_tts import OpenAITTSProvider
from .audio.google_tts import GoogleTTSProvider
# Music
from .music.mubert import MubertProvider
from .music.suno import SunoProvider
# Image
from .image.dalle import DalleProvider
# Storage
from .storage.local import LocalStorageProvider
from .storage.s3 import S3StorageProvider
def create_default_registry(mode: str = "mock") -> ProviderRegistry:
"""Create a provider registry with default providers"""
registry = ProviderRegistry()
if mode == "mock":
from tests.mocks.providers import (
MockVideoProvider, MockAudioProvider, MockMusicProvider
)
registry.register_video(MockVideoProvider())
registry.register_audio(MockAudioProvider())
registry.register_music(MockMusicProvider())
else:
# Live providers - will raise if API keys missing
registry.register_video(RunwayProvider())
registry.register_audio(ElevenLabsProvider())
registry.register_music(MubertProvider())
# Storage - always use local for now
registry.register_storage(LocalStorageProvider())
return registry
__all__ = [
# Base
"VideoProvider", "AudioProvider", "MusicProvider", "StorageProvider",
"GeneratedVideo", "GeneratedAudio", "StoredFile",
"ProviderRegistry",
# Video
"RunwayProvider", "PikaProvider", "StabilityVideoProvider",
"LumaProvider", "KlingProvider",
# Audio
"ElevenLabsProvider", "OpenAITTSProvider", "GoogleTTSProvider",
# Music
"MubertProvider", "SunoProvider",
# Image
"DalleProvider",
# Storage
"LocalStorageProvider", "S3StorageProvider",
# Factory
"create_default_registry",
]
Cost Summary
| Provider | Type | Cost | Max Duration |
|---|---|---|---|
| Runway Gen-3 | Video | $0.25-0.50/sec | 10s |
| Pika Labs | Video | $0.20/sec | 4s |
| Stability Video | Video | $0.10/sec | 4s |
| Luma AI | Video | $0.30/sec | 5s |
| Kling AI | Video | $0.15-0.30/sec | 10s |
| ElevenLabs | Audio | $0.30/1K chars | - |
| OpenAI TTS | Audio | $0.015-0.030/1K chars | - |
| Google TTS | Audio | $0.004-0.016/1K chars | - |
| Mubert | Music | $0.50/track | 5min |
| Suno | Music | $0.05/sec | 2min |
| DALL-E 3 | Image | $0.04-0.08/image | - |
Implementation Priority
- Now: Create stub files with interfaces
- After Audio Pipeline: Implement ElevenLabs + Mubert
- After Docker: Implement Runway + Pika
- As Needed: Other providers
All providers follow the same pattern - implement interface, add to registry, done!