You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

37 lines
977 B
Python

import boto3
from botocore.exceptions import ClientError
from yt_dlp_stream_to_s3.config import (
AWS_ENDPOINT_URL,
AWS_SECRET_ACCESS_KEY,
AWS_ACCESS_KEY_ID,
)
from yt_dlp_stream_to_s3.errors import YtDlpStreamToS3ConnectionError
creds = dict(
endpoint_url=AWS_ENDPOINT_URL,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID,
)
try:
s3_client = boto3.client("s3", **creds)
except Exception:
raise YtDlpStreamToS3ConnectionError(
f"Could not connect to S3 service {AWS_ENDPOINT_URL}"
)
def get_file_size(bucket_name: str, key: str) -> str:
s3_resource = boto3.resource("s3", **creds)
return s3_resource.Object(bucket_name, key).content_length / 1_000_000
def create_presigned_url(bucket_name, object_name, expiration=3600):
return s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": object_name},
ExpiresIn=expiration,
)