vidformer
vidformer-py is a Python 🐍 interface for vidformer.
Quick links:
1""" 2vidformer-py is a Python 🐍 interface for [vidformer](https://github.com/ixlab/vidformer). 3 4**Quick links:** 5* [📦 PyPI](https://pypi.org/project/vidformer/) 6* [📘 Documentation - vidformer-py](https://ixlab.github.io/vidformer/vidformer-py/pdoc/) 7* [📘 Documentation - vidformer.cv2](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/cv2.html) 8* [📘 Documentation - vidformer.supervision](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/supervision.html) 9* [🧑💻 Source Code](https://github.com/ixlab/vidformer/tree/main/vidformer-py/) 10""" 11 12__version__ = "0.11.0" 13 14 15import base64 16import gzip 17import json 18import multiprocessing 19import os 20import random 21import re 22import socket 23import struct 24import subprocess 25import threading 26import time 27import uuid 28from fractions import Fraction 29from urllib.parse import urlparse 30 31import msgpack 32import numpy as np 33import requests 34 35_in_notebook = False 36try: 37 from IPython import get_ipython 38 39 if "IPKernelApp" in get_ipython().config: 40 _in_notebook = True 41except Exception: 42 pass 43 44 45def _wait_for_url(url, max_attempts=150, delay=0.1): 46 for attempt in range(max_attempts): 47 try: 48 response = requests.get(url) 49 if response.status_code == 200: 50 return response.text.strip() 51 else: 52 time.sleep(delay) 53 except requests.exceptions.RequestException: 54 time.sleep(delay) 55 return None 56 57 58def _play(namespace, hls_video_url, hls_js_url, method="html", status_url=None): 59 # The namespace is so multiple videos in one tab don't conflict 60 61 if method == "html": 62 from IPython.display import HTML 63 64 if not status_url: 65 html_code = f""" 66<!DOCTYPE html> 67<html> 68<head> 69 <title>HLS Video Player</title> 70 <!-- Include hls.js library --> 71 <script src="{hls_js_url}"></script> 72</head> 73<body> 74 <video id="video-{namespace}" controls width="640" height="360" autoplay></video> 75 <script> 76 var video = document.getElementById('video-{namespace}'); 77 var videoSrc = '{hls_video_url}'; 78 79 if (Hls.isSupported()) {{ 80 var hls = new Hls(); 81 hls.loadSource(videoSrc); 82 hls.attachMedia(video); 83 hls.on(Hls.Events.MANIFEST_PARSED, function() {{ 84 video.play(); 85 }}); 86 }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{ 87 video.src = videoSrc; 88 video.addEventListener('loadedmetadata', function() {{ 89 video.play(); 90 }}); 91 }} else {{ 92 console.error('This browser does not appear to support HLS.'); 93 }} 94 </script> 95</body> 96</html> 97""" 98 return HTML(data=html_code) 99 else: 100 html_code = f""" 101<!DOCTYPE html> 102<html> 103<head> 104 <title>HLS Video Player</title> 105 <script src="{hls_js_url}"></script> 106</head> 107<body> 108 <div id="container-{namespace}"></div> 109 <script> 110 var statusUrl = '{status_url}'; 111 var videoSrc = '{hls_video_url}'; 112 var videoNamespace = '{namespace}'; 113 114 function showWaiting() {{ 115 document.getElementById('container-{namespace}').textContent = 'Waiting...'; 116 pollStatus(); 117 }} 118 119 function pollStatus() {{ 120 setTimeout(function() {{ 121 fetch(statusUrl) 122 .then(r => r.json()) 123 .then(res => {{ 124 if (res.ready) {{ 125 document.getElementById('container-{namespace}').textContent = ''; 126 attachHls(); 127 }} else {{ 128 pollStatus(); 129 }} 130 }}) 131 .catch(e => {{ 132 console.error(e); 133 pollStatus(); 134 }}); 135 }}, 250); 136 }} 137 138 function attachHls() {{ 139 var container = document.getElementById('container-{namespace}'); 140 container.textContent = ''; 141 var video = document.createElement('video'); 142 video.id = 'video-' + videoNamespace; 143 video.controls = true; 144 video.width = 640; 145 video.height = 360; 146 container.appendChild(video); 147 if (Hls.isSupported()) {{ 148 var hls = new Hls(); 149 hls.loadSource(videoSrc); 150 hls.attachMedia(video); 151 hls.on(Hls.Events.MANIFEST_PARSED, function() {{ 152 video.play(); 153 }}); 154 }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{ 155 video.src = videoSrc; 156 video.addEventListener('loadedmetadata', function() {{ 157 video.play(); 158 }}); 159 }} 160 }} 161 162 fetch(statusUrl) 163 .then(r => r.json()) 164 .then(res => {{ 165 if (res.ready) {{ 166 attachHls(); 167 }} else {{ 168 showWaiting(); 169 }} 170 }}) 171 .catch(e => {{ 172 console.error(e); 173 showWaiting(); 174 }}); 175 </script> 176</body> 177</html> 178""" 179 return HTML(data=html_code) 180 elif method == "link": 181 return hls_video_url 182 else: 183 raise ValueError("Invalid method") 184 185 186def _feb_expr_coded_as_scalar(expr) -> bool: 187 if type(expr) is tuple: 188 expr = list(expr) 189 if type(expr) is FilterExpr: 190 return False 191 if type(expr) is list: 192 if len(expr) > 3: 193 return False 194 else: 195 return all([type(x) is int and x >= -(2**15) and x < 2**15 for x in expr]) 196 else: 197 assert type(expr) in [int, float, str, bytes, SourceExpr, bool, list] 198 return True 199 200 201class _FrameExpressionBlock: 202 def __init__(self): 203 self._functions = [] 204 self._literals = [] 205 self._sources = [] 206 self._kwarg_keys = [] 207 self._source_fracs = [] 208 self._exprs = [] 209 self._frame_exprs = [] 210 211 def __len__(self): 212 return len(self._frame_exprs) 213 214 def insert_expr(self, expr): 215 if type(expr) is SourceExpr or type(expr) is FilterExpr: 216 return self.insert_frame_expr(expr) 217 else: 218 return self.insert_data_expr(expr) 219 220 def insert_data_expr(self, data): 221 if type(data) is tuple: 222 data = list(data) 223 if type(data) is bool: 224 self._exprs.append(0x01000000_00000000 | int(data)) 225 return len(self._exprs) - 1 226 elif type(data) is int: 227 if data >= -(2**31) and data < 2**31: 228 self._exprs.append(data & 0xFFFFFFFF) 229 else: 230 self._literals.append(_json_arg(data, skip_data_anot=True)) 231 self._exprs.append(0x40000000_00000000 | len(self._literals) - 1) 232 return len(self._exprs) - 1 233 elif type(data) is float: 234 self._exprs.append( 235 0x02000000_00000000 | int.from_bytes(struct.pack("f", data)[::-1]) 236 ) 237 elif type(data) is str: 238 self._literals.append(_json_arg(data, skip_data_anot=True)) 239 self._exprs.append(0x40000000_00000000 | len(self._literals) - 1) 240 elif type(data) is bytes: 241 self._literals.append(_json_arg(data, skip_data_anot=True)) 242 self._exprs.append(0x40000000_00000000 | len(self._literals) - 1) 243 elif type(data) is list: 244 if len(data) == 0: 245 self._exprs.append(0x03000000_00000000) 246 return len(self._exprs) - 1 247 if ( 248 len(data) == 1 249 and type(data[0]) is int 250 and data[0] >= -(2**15) 251 and data[0] < 2**15 252 ): 253 self._exprs.append(0x04000000_00000000 | (data[0] & 0xFFFF)) 254 return len(self._exprs) - 1 255 if ( 256 len(data) == 2 257 and type(data[0]) is int 258 and data[0] >= -(2**15) 259 and data[0] < 2**15 260 and type(data[1]) is int 261 and data[1] >= -(2**15) 262 and data[1] < 2**15 263 ): 264 self._exprs.append( 265 0x05000000_00000000 266 | ((data[0] & 0xFFFF) << 16) 267 | (data[1] & 0xFFFF) 268 ) 269 return len(self._exprs) - 1 270 if ( 271 len(data) == 3 272 and type(data[0]) is int 273 and data[0] >= -(2**15) 274 and data[0] < 2**15 275 and type(data[1]) is int 276 and data[1] >= -(2**15) 277 and data[1] < 2**15 278 and type(data[2]) is int 279 and data[2] >= -(2**15) 280 and data[2] < 2**15 281 ): 282 self._exprs.append( 283 0x06000000_00000000 284 | ((data[0] & 0xFFFF) << 32) 285 | ((data[1] & 0xFFFF) << 16) 286 | (data[2] & 0xFFFF) 287 ) 288 return len(self._exprs) - 1 289 out = len(self._exprs) 290 member_idxs = [] 291 for member in data: 292 if _feb_expr_coded_as_scalar(member): 293 member_idxs.append(None) 294 else: 295 member_idxs.append(self.insert_data_expr(member)) 296 297 self._exprs.append(0x42000000_00000000 | len(data)) 298 299 for i in range(len(data)): 300 if member_idxs[i] is None: 301 self.insert_data_expr(data[i]) 302 else: 303 self._exprs.append(0x45000000_00000000 | member_idxs[i]) 304 305 return out 306 else: 307 raise Exception("Invalid data type") 308 309 def insert_frame_expr(self, frame): 310 if type(frame) is SourceExpr: 311 source = frame._source._name 312 if source in self._sources: 313 source_idx = self._sources.index(source) 314 else: 315 source_idx = len(self._sources) 316 self._sources.append(source) 317 if frame._is_iloc: 318 self._exprs.append( 319 0x43000000_00000000 | (source_idx << 32) | frame._idx 320 ) 321 else: 322 idx = len(self._source_fracs) // 2 323 self._source_fracs.append(frame._idx.numerator) 324 self._source_fracs.append(frame._idx.denominator) 325 self._exprs.append(0x44000000_00000000 | (source_idx << 32) | idx) 326 return len(self._exprs) - 1 327 elif type(frame) is FilterExpr: 328 func = frame._filter._func 329 if func in self._functions: 330 func_idx = self._functions.index(func) 331 else: 332 func_idx = len(self._functions) 333 self._functions.append(func) 334 len_args = len(frame._args) 335 len_kwargs = len(frame._kwargs) 336 337 arg_idxs = [] 338 for arg in frame._args: 339 if _feb_expr_coded_as_scalar(arg): 340 arg_idxs.append(None) 341 else: 342 arg_idxs.append(self.insert_expr(arg)) 343 kwarg_idxs = {} 344 for k, v in frame._kwargs.items(): 345 if _feb_expr_coded_as_scalar(v): 346 kwarg_idxs[k] = None 347 else: 348 kwarg_idxs[k] = self.insert_expr(v) 349 350 out_idx = len(self._exprs) 351 self._exprs.append( 352 0x41000000_00000000 | (len_args << 24) | (len_kwargs << 16) | func_idx 353 ) 354 for i in range(len_args): 355 if arg_idxs[i] is None: 356 # It's a scalar 357 self.insert_expr(frame._args[i]) 358 else: 359 # It's an expression pointer 360 self._exprs.append(0x45000000_00000000 | arg_idxs[i]) 361 for k, v in frame._kwargs.items(): 362 if k in self._kwarg_keys: 363 k_idx = self._kwarg_keys.index(k) 364 else: 365 k_idx = len(self._kwarg_keys) 366 self._kwarg_keys.append(k) 367 self._exprs.append(0x46000000_00000000 | k_idx) 368 if kwarg_idxs[k] is None: 369 # It's a scalar 370 self.insert_expr(v) 371 else: 372 # It's an expression pointer 373 self._exprs.append(0x45000000_00000000 | kwarg_idxs[k]) 374 return out_idx 375 else: 376 raise Exception("Invalid frame type") 377 378 def insert_frame(self, frame): 379 idx = self.insert_frame_expr(frame) 380 self._frame_exprs.append(idx) 381 382 def as_dict(self): 383 return { 384 "functions": self._functions, 385 "literals": self._literals, 386 "sources": self._sources, 387 "kwarg_keys": self._kwarg_keys, 388 "source_fracs": self._source_fracs, 389 "exprs": self._exprs, 390 "frame_exprs": self._frame_exprs, 391 } 392 393 394class IgniSource: 395 def __init__(self, id: str, src): 396 self._name = id 397 self._fmt = { 398 "width": src["width"], 399 "height": src["height"], 400 "pix_fmt": src["pix_fmt"], 401 } 402 self._ts = [Fraction(x[0], x[1]) for x in src["ts"]] 403 self.iloc = _SourceILoc(self) 404 405 def id(self) -> str: 406 return self._name 407 408 def fmt(self): 409 return {**self._fmt} 410 411 def ts(self) -> list[Fraction]: 412 return self._ts.copy() 413 414 def __len__(self): 415 return len(self._ts) 416 417 def __getitem__(self, idx): 418 if type(idx) is not Fraction: 419 raise Exception("Source index must be a Fraction") 420 return SourceExpr(self, idx, False) 421 422 def __repr__(self): 423 return f"IgniSource({self._name})" 424 425 426class IgniSpec: 427 def __init__(self, id: str, src): 428 self._id = id 429 self._fmt = { 430 "width": src["width"], 431 "height": src["height"], 432 "pix_fmt": src["pix_fmt"], 433 } 434 self._vod_endpoint = src["vod_endpoint"] 435 parsed_url = urlparse(self._vod_endpoint) 436 self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js" 437 438 def id(self) -> str: 439 return self._id 440 441 def play(self, *args, **kwargs): 442 url = f"{self._vod_endpoint}playlist.m3u8" 443 status_url = f"{self._vod_endpoint}status" 444 hls_js_url = self._hls_js_url 445 return _play(self._id, url, hls_js_url, *args, **kwargs, status_url=status_url) 446 447 448class IgniServer: 449 def __init__(self, endpoint: str, api_key: str): 450 if not endpoint.startswith("http://") and not endpoint.startswith("https://"): 451 raise Exception("Endpoint must start with http:// or https://") 452 if endpoint.endswith("/"): 453 raise Exception("Endpoint must not end with /") 454 self._endpoint = endpoint 455 456 self._api_key = api_key 457 self._session = requests.Session() 458 self._session.headers.update({"Authorization": f"Bearer {self._api_key}"}) 459 response = self._session.get( 460 f"{self._endpoint}/auth", 461 headers={"Authorization": f"Bearer {self._api_key}"}, 462 ) 463 if not response.ok: 464 raise Exception(response.text) 465 response = response.json() 466 assert response["status"] == "ok" 467 468 def get_source(self, id: str) -> IgniSource: 469 assert type(id) is str 470 response = self._session.get( 471 f"{self._endpoint}/source/{id}", 472 headers={"Authorization": f"Bearer {self._api_key}"}, 473 ) 474 if not response.ok: 475 raise Exception(response.text) 476 response = response.json() 477 return IgniSource(response["id"], response) 478 479 def list_sources(self) -> list[str]: 480 response = self._session.get( 481 f"{self._endpoint}/source", 482 headers={"Authorization": f"Bearer {self._api_key}"}, 483 ) 484 if not response.ok: 485 raise Exception(response.text) 486 response = response.json() 487 return response 488 489 def delete_source(self, id: str): 490 assert type(id) is str 491 response = self._session.delete( 492 f"{self._endpoint}/source/{id}", 493 headers={"Authorization": f"Bearer {self._api_key}"}, 494 ) 495 if not response.ok: 496 raise Exception(response.text) 497 response = response.json() 498 assert response["status"] == "ok" 499 500 def search_source( 501 self, name, stream_idx, storage_service, storage_config 502 ) -> list[str]: 503 assert type(name) is str 504 assert type(stream_idx) is int 505 assert type(storage_service) is str 506 assert type(storage_config) is dict 507 for k, v in storage_config.items(): 508 assert type(k) is str 509 assert type(v) is str 510 req = { 511 "name": name, 512 "stream_idx": stream_idx, 513 "storage_service": storage_service, 514 "storage_config": storage_config, 515 } 516 response = self._session.post( 517 f"{self._endpoint}/source/search", 518 json=req, 519 headers={"Authorization": f"Bearer {self._api_key}"}, 520 ) 521 if not response.ok: 522 raise Exception(response.text) 523 response = response.json() 524 return response 525 526 def create_source( 527 self, name, stream_idx, storage_service, storage_config 528 ) -> IgniSource: 529 assert type(name) is str 530 assert type(stream_idx) is int 531 assert type(storage_service) is str 532 assert type(storage_config) is dict 533 for k, v in storage_config.items(): 534 assert type(k) is str 535 assert type(v) is str 536 req = { 537 "name": name, 538 "stream_idx": stream_idx, 539 "storage_service": storage_service, 540 "storage_config": storage_config, 541 } 542 response = self._session.post( 543 f"{self._endpoint}/source", 544 json=req, 545 headers={"Authorization": f"Bearer {self._api_key}"}, 546 ) 547 if not response.ok: 548 raise Exception(response.text) 549 response = response.json() 550 assert response["status"] == "ok" 551 id = response["id"] 552 return self.get_source(id) 553 554 def source(self, name, stream_idx, storage_service, storage_config) -> IgniSource: 555 """Convenience function for accessing sources. 556 557 Tries to find a source with the given name, stream_idx, storage_service, and storage_config. 558 If no source is found, creates a new source with the given parameters. 559 """ 560 561 sources = self.search_source(name, stream_idx, storage_service, storage_config) 562 if len(sources) == 0: 563 return self.create_source(name, stream_idx, storage_service, storage_config) 564 return self.get_source(sources[0]) 565 566 def get_spec(self, id: str) -> IgniSpec: 567 assert type(id) is str 568 response = self._session.get( 569 f"{self._endpoint}/spec/{id}", 570 headers={"Authorization": f"Bearer {self._api_key}"}, 571 ) 572 if not response.ok: 573 raise Exception(response.text) 574 response = response.json() 575 return IgniSpec(response["id"], response) 576 577 def list_specs(self) -> list[str]: 578 response = self._session.get( 579 f"{self._endpoint}/spec", 580 headers={"Authorization": f"Bearer {self._api_key}"}, 581 ) 582 if not response.ok: 583 raise Exception(response.text) 584 response = response.json() 585 return response 586 587 def create_spec( 588 self, 589 width, 590 height, 591 pix_fmt, 592 vod_segment_length, 593 frame_rate, 594 ready_hook=None, 595 steer_hook=None, 596 ttl=None, 597 ) -> IgniSpec: 598 assert type(width) is int 599 assert type(height) is int 600 assert type(pix_fmt) is str 601 assert type(vod_segment_length) is Fraction 602 assert type(frame_rate) is Fraction 603 assert type(ready_hook) is str or ready_hook is None 604 assert type(steer_hook) is str or steer_hook is None 605 assert ttl is None or type(ttl) is int 606 607 req = { 608 "width": width, 609 "height": height, 610 "pix_fmt": pix_fmt, 611 "vod_segment_length": [ 612 vod_segment_length.numerator, 613 vod_segment_length.denominator, 614 ], 615 "frame_rate": [frame_rate.numerator, frame_rate.denominator], 616 "ready_hook": ready_hook, 617 "steer_hook": steer_hook, 618 "ttl": ttl, 619 } 620 response = self._session.post( 621 f"{self._endpoint}/spec", 622 json=req, 623 headers={"Authorization": f"Bearer {self._api_key}"}, 624 ) 625 if not response.ok: 626 raise Exception(response.text) 627 response = response.json() 628 assert response["status"] == "ok" 629 return self.get_spec(response["id"]) 630 631 def delete_spec(self, id: str): 632 assert type(id) is str 633 response = self._session.delete( 634 f"{self._endpoint}/spec/{id}", 635 headers={"Authorization": f"Bearer {self._api_key}"}, 636 ) 637 if not response.ok: 638 raise Exception(response.text) 639 response = response.json() 640 assert response["status"] == "ok" 641 642 def push_spec_part(self, spec_id, pos, frames, terminal): 643 if type(spec_id) is IgniSpec: 644 spec_id = spec_id._id 645 assert type(spec_id) is str 646 assert type(pos) is int 647 assert type(frames) is list 648 assert type(terminal) is bool 649 650 req_frames = [] 651 for frame in frames: 652 assert type(frame) is tuple 653 assert len(frame) == 2 654 t = frame[0] 655 f = frame[1] 656 assert type(t) is Fraction 657 assert f is None or type(f) is SourceExpr or type(f) is FilterExpr 658 req_frames.append( 659 [ 660 [t.numerator, t.denominator], 661 f._to_json_spec() if f is not None else None, 662 ] 663 ) 664 665 req = { 666 "pos": pos, 667 "frames": req_frames, 668 "terminal": terminal, 669 } 670 response = self._session.post( 671 f"{self._endpoint}/spec/{spec_id}/part", 672 json=req, 673 headers={"Authorization": f"Bearer {self._api_key}"}, 674 ) 675 if not response.ok: 676 raise Exception(response.text) 677 response = response.json() 678 assert response["status"] == "ok" 679 680 def push_spec_part_block( 681 self, spec_id: str, pos, blocks, terminal, compression="gzip" 682 ): 683 if type(spec_id) is IgniSpec: 684 spec_id = spec_id._id 685 assert type(spec_id) is str 686 assert type(pos) is int 687 assert type(blocks) is list 688 assert type(terminal) is bool 689 assert compression is None or compression == "gzip" 690 691 req_blocks = [] 692 for block in blocks: 693 assert type(block) is _FrameExpressionBlock 694 block_body = block.as_dict() 695 block_frames = len(block_body["frame_exprs"]) 696 block_body = json.dumps(block_body).encode("utf-8") 697 if compression == "gzip": 698 block_body = gzip.compress(block_body, 1) 699 block_body = base64.b64encode(block_body).decode("utf-8") 700 req_blocks.append( 701 { 702 "frames": block_frames, 703 "compression": compression, 704 "body": block_body, 705 } 706 ) 707 708 req = { 709 "pos": pos, 710 "terminal": terminal, 711 "blocks": req_blocks, 712 } 713 response = self._session.post( 714 f"{self._endpoint}/spec/{spec_id}/part_block", 715 json=req, 716 headers={"Authorization": f"Bearer {self._api_key}"}, 717 ) 718 if not response.ok: 719 raise Exception(response.text) 720 response = response.json() 721 assert response["status"] == "ok" 722 723 def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"): 724 assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr 725 assert compression is None or compression in ["gzip"] 726 feb = _FrameExpressionBlock() 727 feb.insert_frame(frame_expr) 728 feb_body = feb.as_dict() 729 730 feb_body = json.dumps(feb_body).encode("utf-8") 731 if compression == "gzip": 732 feb_body = gzip.compress(feb_body, 1) 733 feb_body = base64.b64encode(feb_body).decode("utf-8") 734 req = { 735 "width": width, 736 "height": height, 737 "pix_fmt": pix_fmt, 738 "compression": compression, 739 "block": { 740 "frames": 1, 741 "compression": compression, 742 "body": feb_body, 743 }, 744 } 745 response = self._session.post( 746 f"{self._endpoint}/frame", 747 json=req, 748 headers={"Authorization": f"Bearer {self._api_key}"}, 749 ) 750 if not response.ok: 751 raise Exception(response.text) 752 response_body = response.content 753 assert type(response_body) is bytes 754 if compression == "gzip": 755 response_body = gzip.decompress(response_body) 756 return response_body 757 758 759class YrdenSpec: 760 """ 761 A video transformation specification. 762 763 See https://ixlab.github.io/vidformer/concepts.html for more information. 764 """ 765 766 def __init__(self, domain: list[Fraction], render, fmt: dict): 767 self._domain = domain 768 self._render = render 769 self._fmt = fmt 770 771 def __repr__(self): 772 if len(self._domain) <= 20: 773 lines = [] 774 for i, t in enumerate(self._domain): 775 frame_expr = self._render(t, i) 776 lines.append( 777 f"{t.numerator}/{t.denominator} => {frame_expr}", 778 ) 779 return "\n".join(lines) 780 else: 781 lines = [] 782 for i, t in enumerate(self._domain[:10]): 783 frame_expr = self._render(t, i) 784 lines.append( 785 f"{t.numerator}/{t.denominator} => {frame_expr}", 786 ) 787 lines.append("...") 788 for i, t in enumerate(self._domain[-10:]): 789 frame_expr = self._render(t, i) 790 lines.append( 791 f"{t.numerator}/{t.denominator} => {frame_expr}", 792 ) 793 return "\n".join(lines) 794 795 def _sources(self): 796 s = set() 797 for i, t in enumerate(self._domain): 798 frame_expr = self._render(t, i) 799 s = s.union(frame_expr._sources()) 800 return s 801 802 def _to_json_spec(self): 803 frames = [] 804 s = set() 805 f = {} 806 for i, t in enumerate(self._domain): 807 frame_expr = self._render(t, i) 808 s = s.union(frame_expr._sources()) 809 f = {**f, **frame_expr._filters()} 810 frame = [[t.numerator, t.denominator], frame_expr._to_json_spec()] 811 frames.append(frame) 812 return {"frames": frames}, s, f 813 814 def play(self, server, method="html", verbose=False): 815 """Play the video live in the notebook.""" 816 817 spec, sources, filters = self._to_json_spec() 818 spec_json_bytes = json.dumps(spec).encode("utf-8") 819 spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1) 820 spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8") 821 822 sources = [ 823 { 824 "name": s._name, 825 "path": s._path, 826 "stream": s._stream, 827 "service": s._service.as_json() if s._service is not None else None, 828 } 829 for s in sources 830 ] 831 filters = { 832 k: { 833 "filter": v._func, 834 "args": v._kwargs, 835 } 836 for k, v in filters.items() 837 } 838 839 if verbose: 840 print(f"Sending to server. Spec is {len(spec_obj_json_gzip_b64)} bytes") 841 842 resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt) 843 hls_video_url = resp["stream_url"] 844 hls_player_url = resp["player_url"] 845 namespace = resp["namespace"] 846 hls_js_url = server.hls_js_url() 847 848 if method == "link": 849 return hls_video_url 850 if method == "player": 851 return hls_player_url 852 if method == "iframe": 853 from IPython.display import IFrame 854 855 return IFrame(hls_player_url, width=1280, height=720) 856 if method == "html": 857 from IPython.display import HTML 858 859 # We add a namespace to the video element to avoid conflicts with other videos 860 html_code = f""" 861<!DOCTYPE html> 862<html> 863<head> 864 <title>HLS Video Player</title> 865 <!-- Include hls.js library --> 866 <script src="{hls_js_url}"></script> 867</head> 868<body> 869 <!-- Video element --> 870 <video id="video-{namespace}" controls width="640" height="360" autoplay></video> 871 <script> 872 var video = document.getElementById('video-{namespace}'); 873 var videoSrc = '{hls_video_url}'; 874 var hls = new Hls(); 875 hls.loadSource(videoSrc); 876 hls.attachMedia(video); 877 hls.on(Hls.Events.MANIFEST_PARSED, function() {{ 878 video.play(); 879 }}); 880 </script> 881</body> 882</html> 883""" 884 return HTML(data=html_code) 885 else: 886 return hls_player_url 887 888 def load(self, server): 889 spec, sources, filters = self._to_json_spec() 890 spec_json_bytes = json.dumps(spec).encode("utf-8") 891 spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1) 892 spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8") 893 894 sources = [ 895 { 896 "name": s._name, 897 "path": s._path, 898 "stream": s._stream, 899 "service": s._service.as_json() if s._service is not None else None, 900 } 901 for s in sources 902 ] 903 filters = { 904 k: { 905 "filter": v._func, 906 "args": v._kwargs, 907 } 908 for k, v in filters.items() 909 } 910 resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt) 911 namespace = resp["namespace"] 912 return YrdenLoader(server, namespace, self._domain) 913 914 def save(self, server, pth, encoder=None, encoder_opts=None, format=None): 915 """Save the video to a file.""" 916 917 assert encoder is None or type(encoder) is str 918 assert encoder_opts is None or type(encoder_opts) is dict 919 if encoder_opts is not None: 920 for k, v in encoder_opts.items(): 921 assert type(k) is str and type(v) is str 922 923 spec, sources, filters = self._to_json_spec() 924 spec_json_bytes = json.dumps(spec).encode("utf-8") 925 spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1) 926 spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8") 927 928 sources = [ 929 { 930 "name": s._name, 931 "path": s._path, 932 "stream": s._stream, 933 "service": s._service.as_json() if s._service is not None else None, 934 } 935 for s in sources 936 ] 937 filters = { 938 k: { 939 "filter": v._func, 940 "args": v._kwargs, 941 } 942 for k, v in filters.items() 943 } 944 945 resp = server._export( 946 pth, 947 spec_obj_json_gzip_b64, 948 sources, 949 filters, 950 self._fmt, 951 encoder, 952 encoder_opts, 953 format, 954 ) 955 956 return resp 957 958 def _vrod_bench(self, server): 959 out = {} 960 pth = "spec.json" 961 start_t = time.time() 962 with open(pth, "w") as outfile: 963 spec, sources, filters = self._to_json_spec() 964 outfile.write(json.dumps(spec)) 965 966 sources = [ 967 { 968 "name": s._name, 969 "path": s._path, 970 "stream": s._stream, 971 "service": s._service.as_json() if s._service is not None else None, 972 } 973 for s in sources 974 ] 975 filters = { 976 k: { 977 "filter": v._func, 978 "args": v._kwargs, 979 } 980 for k, v in filters.items() 981 } 982 end_t = time.time() 983 out["vrod_create_spec"] = end_t - start_t 984 985 start = time.time() 986 resp = server._new(pth, sources, filters, self._fmt) 987 end = time.time() 988 out["vrod_register"] = end - start 989 990 stream_url = resp["stream_url"] 991 first_segment = stream_url.replace("stream.m3u8", "segment-0.ts") 992 993 start = time.time() 994 r = requests.get(first_segment) 995 r.raise_for_status() 996 end = time.time() 997 out["vrod_first_segment"] = end - start 998 return out 999 1000 def _dve2_bench(self, server): 1001 pth = "spec.json" 1002 out = {} 1003 start_t = time.time() 1004 with open(pth, "w") as outfile: 1005 spec, sources, filters = self._to_json_spec() 1006 outfile.write(json.dumps(spec)) 1007 1008 sources = [ 1009 { 1010 "name": s._name, 1011 "path": s._path, 1012 "stream": s._stream, 1013 "service": s._service.as_json() if s._service is not None else None, 1014 } 1015 for s in sources 1016 ] 1017 filters = { 1018 k: { 1019 "filter": v._func, 1020 "args": v._kwargs, 1021 } 1022 for k, v in filters.items() 1023 } 1024 end_t = time.time() 1025 out["dve2_create_spec"] = end_t - start_t 1026 1027 start = time.time() 1028 resp = server._export(pth, sources, filters, self._fmt, None, None) 1029 resp.raise_for_status() 1030 end = time.time() 1031 out["dve2_exec"] = end - start 1032 return out 1033 1034 1035class YrdenLoader: 1036 def __init__(self, server, namespace: str, domain): 1037 self._server = server 1038 self._namespace = namespace 1039 self._domain = domain 1040 1041 def _chunk(self, start_i, end_i): 1042 return self._server._raw(self._namespace, start_i, end_i) 1043 1044 def __len__(self): 1045 return len(self._domain) 1046 1047 def _find_index_by_rational(self, value): 1048 if value not in self._domain: 1049 raise ValueError(f"Rational timestamp {value} is not in the domain") 1050 return self._domain.index(value) 1051 1052 def __getitem__(self, index): 1053 if isinstance(index, slice): 1054 start = index.start if index.start is not None else 0 1055 end = index.stop if index.stop is not None else len(self._domain) 1056 assert start >= 0 and start < len(self._domain) 1057 assert end >= 0 and end <= len(self._domain) 1058 assert start <= end 1059 num_frames = end - start 1060 all_bytes = self._chunk(start, end - 1) 1061 all_bytes_len = len(all_bytes) 1062 assert all_bytes_len % num_frames == 0 1063 return [ 1064 all_bytes[ 1065 i 1066 * all_bytes_len 1067 // num_frames : (i + 1) 1068 * all_bytes_len 1069 // num_frames 1070 ] 1071 for i in range(num_frames) 1072 ] 1073 elif isinstance(index, int): 1074 assert index >= 0 and index < len(self._domain) 1075 return self._chunk(index, index) 1076 else: 1077 raise TypeError( 1078 "Invalid argument type for iloc. Use a slice or an integer." 1079 ) 1080 1081 1082class YrdenServer: 1083 """ 1084 A connection to a Yrden server. 1085 1086 A yrden server is the main API for local use of vidformer. 1087 """ 1088 1089 def __init__(self, domain=None, port=None, bin=None, hls_prefix=None): 1090 """ 1091 Connect to a Yrden server 1092 1093 Can either connect to an existing server, if domain and port are provided, or start a new server using the provided binary. 1094 If no domain or binary is provided, the `VIDFORMER_BIN` environment variable is used. 1095 """ 1096 1097 self._domain = domain 1098 self._port = port 1099 self._proc = None 1100 if self._port is None: 1101 if bin is None: 1102 if os.getenv("VIDFORMER_BIN") is not None: 1103 bin = os.getenv("VIDFORMER_BIN") 1104 else: 1105 bin = "vidformer-cli" 1106 1107 self._domain = "localhost" 1108 self._port = random.randint(49152, 65535) 1109 cmd = [bin, "yrden", "--port", str(self._port)] 1110 if _in_notebook: 1111 # We need to print the URL in the notebook 1112 # This is a trick to get VS Code to forward the port 1113 cmd += ["--print-url"] 1114 1115 if hls_prefix is not None: 1116 if type(hls_prefix) is not str: 1117 raise Exception("hls_prefix must be a string") 1118 cmd += ["--hls-prefix", hls_prefix] 1119 1120 self._proc = subprocess.Popen(cmd) 1121 1122 version = _wait_for_url(f"http://{self._domain}:{self._port}/") 1123 if version is None: 1124 raise Exception("Failed to connect to server") 1125 1126 expected_version = f"vidformer-yrden v{__version__}" 1127 if version != expected_version: 1128 print( 1129 f"Warning: Expected version `{expected_version}`, got `{version}`. API may not be compatible!" 1130 ) 1131 1132 def _source(self, name: str, path: str, stream: int, service): 1133 r = requests.post( 1134 f"http://{self._domain}:{self._port}/source", 1135 json={ 1136 "name": name, 1137 "path": path, 1138 "stream": stream, 1139 "service": service.as_json() if service is not None else None, 1140 }, 1141 ) 1142 if not r.ok: 1143 raise Exception(r.text) 1144 1145 resp = r.json() 1146 resp["ts"] = [Fraction(x[0], x[1]) for x in resp["ts"]] 1147 return resp 1148 1149 def _new(self, spec, sources, filters, fmt): 1150 req = { 1151 "spec": spec, 1152 "sources": sources, 1153 "filters": filters, 1154 "width": fmt["width"], 1155 "height": fmt["height"], 1156 "pix_fmt": fmt["pix_fmt"], 1157 } 1158 1159 r = requests.post(f"http://{self._domain}:{self._port}/new", json=req) 1160 if not r.ok: 1161 raise Exception(r.text) 1162 1163 return r.json() 1164 1165 def _export(self, pth, spec, sources, filters, fmt, encoder, encoder_opts, format): 1166 req = { 1167 "spec": spec, 1168 "sources": sources, 1169 "filters": filters, 1170 "width": fmt["width"], 1171 "height": fmt["height"], 1172 "pix_fmt": fmt["pix_fmt"], 1173 "output_path": pth, 1174 "encoder": encoder, 1175 "encoder_opts": encoder_opts, 1176 "format": format, 1177 } 1178 1179 r = requests.post(f"http://{self._domain}:{self._port}/export", json=req) 1180 if not r.ok: 1181 raise Exception(r.text) 1182 1183 return r.json() 1184 1185 def _raw(self, namespace, start_i, end_i): 1186 r = requests.get( 1187 f"http://{self._domain}:{self._port}/{namespace}/raw/{start_i}-{end_i}" 1188 ) 1189 if not r.ok: 1190 raise Exception(r.text) 1191 return r.content 1192 1193 def hls_js_url(self): 1194 """Return the link to the yrden-hosted hls.js file""" 1195 return f"http://{self._domain}:{self._port}/hls.js" 1196 1197 def __del__(self): 1198 if self._proc is not None: 1199 self._proc.kill() 1200 1201 1202class YrdenSource: 1203 """A video source.""" 1204 1205 def __init__( 1206 self, server: YrdenServer, name: str, path: str, stream: int, service=None 1207 ): 1208 if service is None: 1209 # check if path is a http URL and, if so, automatically set the service 1210 # for example, the following code should work with just vf.Source(server, "tos_720p", "https://f.dominik.win/data/dve2/tos_720p.mp4") 1211 # this creates a storage service with endpoint "https://f.dominik.win/" and path "data/dve2/tos_720p.mp4" 1212 # don't use the root parameter in this case 1213 1214 match = re.match(r"(http|https)://([^/]+)(.*)", path) 1215 if match is not None: 1216 endpoint = f"{match.group(1)}://{match.group(2)}" 1217 path = match.group(3) 1218 # remove leading slash 1219 if path.startswith("/"): 1220 path = path[1:] 1221 service = YrdenStorageService("http", endpoint=endpoint) 1222 1223 self._server = server 1224 self._name = name 1225 self._path = path 1226 self._stream = stream 1227 self._service = service 1228 1229 self.iloc = _SourceILoc(self) 1230 1231 self._src = self._server._source( 1232 self._name, self._path, self._stream, self._service 1233 ) 1234 1235 def fmt(self): 1236 return { 1237 "width": self._src["width"], 1238 "height": self._src["height"], 1239 "pix_fmt": self._src["pix_fmt"], 1240 } 1241 1242 def ts(self): 1243 return self._src["ts"] 1244 1245 def __len__(self): 1246 return len(self._src["ts"]) 1247 1248 def __getitem__(self, idx): 1249 if type(idx) is not Fraction: 1250 raise Exception("Source index must be a Fraction") 1251 return SourceExpr(self, idx, False) 1252 1253 def play(self, *args, **kwargs): 1254 """Play the video live in the notebook.""" 1255 1256 domain = self.ts() 1257 1258 def render(t, _i): 1259 return self[t] 1260 1261 spec = YrdenSpec(domain, render, self.fmt()) 1262 return spec.play(*args, **kwargs) 1263 1264 1265class YrdenStorageService: 1266 def __init__(self, service: str, **kwargs): 1267 if type(service) is not str: 1268 raise Exception("Service name must be a string") 1269 self._service = service 1270 for k, v in kwargs.items(): 1271 if type(v) is not str: 1272 raise Exception(f"Value of {k} must be a string") 1273 self._config = kwargs 1274 1275 def as_json(self): 1276 return {"service": self._service, "config": self._config} 1277 1278 def __repr__(self): 1279 return f"{self._service}(config={self._config})" 1280 1281 1282class SourceExpr: 1283 def __init__(self, source, idx, is_iloc): 1284 self._source = source 1285 self._idx = idx 1286 self._is_iloc = is_iloc 1287 1288 def __repr__(self): 1289 if self._is_iloc: 1290 return f"{self._source._name}.iloc[{self._idx}]" 1291 else: 1292 return f"{self._source._name}[{self._idx}]" 1293 1294 def _to_json_spec(self): 1295 if self._is_iloc: 1296 return { 1297 "Source": { 1298 "video": self._source._name, 1299 "index": {"ILoc": int(self._idx)}, 1300 } 1301 } 1302 else: 1303 return { 1304 "Source": { 1305 "video": self._source._name, 1306 "index": {"T": [self._idx.numerator, self._idx.denominator]}, 1307 } 1308 } 1309 1310 def _sources(self): 1311 return set([self._source]) 1312 1313 def _filters(self): 1314 return {} 1315 1316 1317class _SourceILoc: 1318 def __init__(self, source): 1319 self._source = source 1320 1321 def __getitem__(self, idx): 1322 if type(idx) is not int: 1323 raise Exception(f"Source iloc index must be an integer, got a {type(idx)}") 1324 return SourceExpr(self._source, idx, True) 1325 1326 1327def _json_arg(arg, skip_data_anot=False): 1328 if type(arg) is FilterExpr or type(arg) is SourceExpr: 1329 return {"Frame": arg._to_json_spec()} 1330 elif type(arg) is int: 1331 if skip_data_anot: 1332 return {"Int": arg} 1333 return {"Data": {"Int": arg}} 1334 elif type(arg) is str: 1335 if skip_data_anot: 1336 return {"String": arg} 1337 return {"Data": {"String": arg}} 1338 elif type(arg) is bytes: 1339 arg = list(arg) 1340 if skip_data_anot: 1341 return {"Bytes": arg} 1342 return {"Data": {"Bytes": arg}} 1343 elif type(arg) is float: 1344 if skip_data_anot: 1345 return {"Float": arg} 1346 return {"Data": {"Float": arg}} 1347 elif type(arg) is bool: 1348 if skip_data_anot: 1349 return {"Bool": arg} 1350 return {"Data": {"Bool": arg}} 1351 elif type(arg) is tuple or type(arg) is list: 1352 if skip_data_anot: 1353 return {"List": [_json_arg(x, True) for x in list(arg)]} 1354 return {"Data": {"List": [_json_arg(x, True) for x in list(arg)]}} 1355 else: 1356 raise Exception(f"Unknown arg type: {type(arg)}") 1357 1358 1359class Filter: 1360 """A video filter.""" 1361 1362 def __init__(self, name: str, tl_func=None, **kwargs): 1363 self._name = name 1364 1365 # tl_func is the top level func, which is the true implementation, not just a pretty name 1366 if tl_func is None: 1367 self._func = name 1368 else: 1369 self._func = tl_func 1370 1371 # filter infra args, not invocation args 1372 for k, v in kwargs.items(): 1373 if type(v) is not str: 1374 raise Exception(f"Value of {k} must be a string") 1375 self._kwargs = kwargs 1376 1377 def __call__(self, *args, **kwargs): 1378 return FilterExpr(self, args, kwargs) 1379 1380 1381class FilterExpr: 1382 def __init__(self, filter: Filter, args, kwargs): 1383 self._filter = filter 1384 self._args = args 1385 self._kwargs = kwargs 1386 1387 def __repr__(self): 1388 args = [] 1389 for arg in self._args: 1390 val = f'"{arg}"' if type(arg) is str else str(arg) 1391 args.append(str(val)) 1392 for k, v in self._kwargs.items(): 1393 val = f'"{v}"' if type(v) is str else str(v) 1394 args.append(f"{k}={val}") 1395 return f"{self._filter._name}({', '.join(args)})" 1396 1397 def _to_json_spec(self): 1398 args = [] 1399 for arg in self._args: 1400 args.append(_json_arg(arg)) 1401 kwargs = {} 1402 for k, v in self._kwargs.items(): 1403 kwargs[k] = _json_arg(v) 1404 return {"Filter": {"name": self._filter._name, "args": args, "kwargs": kwargs}} 1405 1406 def _sources(self): 1407 s = set() 1408 for arg in self._args: 1409 if type(arg) is FilterExpr or type(arg) is SourceExpr: 1410 s = s.union(arg._sources()) 1411 for arg in self._kwargs.values(): 1412 if type(arg) is FilterExpr or type(arg) is SourceExpr: 1413 s = s.union(arg._sources()) 1414 return s 1415 1416 def _filters(self): 1417 f = {self._filter._name: self._filter} 1418 for arg in self._args: 1419 if type(arg) is FilterExpr: 1420 f = {**f, **arg._filters()} 1421 for arg in self._kwargs.values(): 1422 if type(arg) is FilterExpr: 1423 f = {**f, **arg._filters()} 1424 return f 1425 1426 1427class UDF: 1428 """User-defined filter superclass""" 1429 1430 def __init__(self, name: str): 1431 self._name = name 1432 self._socket_path = None 1433 self._p = None 1434 1435 def filter(self, *args, **kwargs): 1436 raise Exception("User must implement the filter method") 1437 1438 def filter_type(self, *args, **kwargs): 1439 raise Exception("User must implement the filter_type method") 1440 1441 def into_filter(self): 1442 assert self._socket_path is None 1443 self._socket_path = f"/tmp/vidformer-{self._name}-{str(uuid.uuid4())}.sock" 1444 self._p = multiprocessing.Process( 1445 target=_run_udf_host, args=(self, self._socket_path) 1446 ) 1447 self._p.start() 1448 return Filter( 1449 name=self._name, tl_func="IPC", socket=self._socket_path, func=self._name 1450 ) 1451 1452 def _handle_connection(self, connection): 1453 try: 1454 while True: 1455 frame_len = connection.recv(4) 1456 if not frame_len or len(frame_len) != 4: 1457 break 1458 frame_len = int.from_bytes(frame_len, byteorder="big") 1459 data = connection.recv(frame_len) 1460 if not data: 1461 break 1462 1463 while len(data) < frame_len: 1464 new_data = connection.recv(frame_len - len(data)) 1465 if not new_data: 1466 raise Exception("Partial data received") 1467 data += new_data 1468 1469 obj = msgpack.unpackb(data, raw=False) 1470 f_op, f_args, f_kwargs = ( 1471 obj["op"], 1472 obj["args"], 1473 obj["kwargs"], 1474 ) 1475 1476 response = None 1477 if f_op == "filter": 1478 f_args = [self._deser_filter(x) for x in f_args] 1479 f_kwargs = {k: self._deser_filter(v) for k, v in f_kwargs} 1480 response = self.filter(*f_args, **f_kwargs) 1481 if type(response) is not UDFFrame: 1482 raise Exception( 1483 f"filter must return a UDFFrame, got {type(response)}" 1484 ) 1485 if response.frame_type().pix_fmt() != "rgb24": 1486 raise Exception( 1487 f"filter must return a frame with pix_fmt 'rgb24', got {response.frame_type().pix_fmt()}" 1488 ) 1489 1490 response = response._response_ser() 1491 elif f_op == "filter_type": 1492 f_args = [self._deser_filter_type(x) for x in f_args] 1493 f_kwargs = {k: self._deser_filter_type(v) for k, v in f_kwargs} 1494 response = self.filter_type(*f_args, **f_kwargs) 1495 if type(response) is not UDFFrameType: 1496 raise Exception( 1497 f"filter_type must return a UDFFrameType, got {type(response)}" 1498 ) 1499 if response.pix_fmt() != "rgb24": 1500 raise Exception( 1501 f"filter_type must return a frame with pix_fmt 'rgb24', got {response.pix_fmt()}" 1502 ) 1503 response = response._response_ser() 1504 else: 1505 raise Exception(f"Unknown operation: {f_op}") 1506 1507 response = msgpack.packb(response, use_bin_type=True) 1508 response_len = len(response).to_bytes(4, byteorder="big") 1509 connection.sendall(response_len) 1510 connection.sendall(response) 1511 finally: 1512 connection.close() 1513 1514 def _deser_filter_type(self, obj): 1515 assert type(obj) is dict 1516 keys = list(obj.keys()) 1517 assert len(keys) == 1 1518 type_key = keys[0] 1519 assert type_key in ["FrameType", "String", "Int", "Bool"] 1520 1521 if type_key == "FrameType": 1522 frame = obj[type_key] 1523 assert type(frame) is dict 1524 assert "width" in frame 1525 assert "height" in frame 1526 assert "format" in frame 1527 assert type(frame["width"]) is int 1528 assert type(frame["height"]) is int 1529 assert frame["format"] == 2 # AV_PIX_FMT_RGB24 1530 return UDFFrameType(frame["width"], frame["height"], "rgb24") 1531 elif type_key == "String": 1532 assert type(obj[type_key]) is str 1533 return obj[type_key] 1534 elif type_key == "Int": 1535 assert type(obj[type_key]) is int 1536 return obj[type_key] 1537 elif type_key == "Bool": 1538 assert type(obj[type_key]) is bool 1539 return obj[type_key] 1540 else: 1541 assert False, f"Unknown type: {type_key}" 1542 1543 def _deser_filter(self, obj): 1544 assert type(obj) is dict 1545 keys = list(obj.keys()) 1546 assert len(keys) == 1 1547 type_key = keys[0] 1548 assert type_key in ["Frame", "String", "Int", "Bool"] 1549 1550 if type_key == "Frame": 1551 frame = obj[type_key] 1552 assert type(frame) is dict 1553 assert "data" in frame 1554 assert "width" in frame 1555 assert "height" in frame 1556 assert "format" in frame 1557 assert type(frame["width"]) is int 1558 assert type(frame["height"]) is int 1559 assert frame["format"] == "rgb24" 1560 assert type(frame["data"]) is bytes 1561 1562 data = np.frombuffer(frame["data"], dtype=np.uint8) 1563 data = data.reshape(frame["height"], frame["width"], 3) 1564 return UDFFrame( 1565 data, UDFFrameType(frame["width"], frame["height"], "rgb24") 1566 ) 1567 elif type_key == "String": 1568 assert type(obj[type_key]) is str 1569 return obj[type_key] 1570 elif type_key == "Int": 1571 assert type(obj[type_key]) is int 1572 return obj[type_key] 1573 elif type_key == "Bool": 1574 assert type(obj[type_key]) is bool 1575 return obj[type_key] 1576 else: 1577 assert False, f"Unknown type: {type_key}" 1578 1579 def _host(self, socket_path: str): 1580 if os.path.exists(socket_path): 1581 os.remove(socket_path) 1582 1583 # start listening on the socket 1584 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1585 sock.bind(socket_path) 1586 sock.listen(1) 1587 1588 while True: 1589 # accept incoming connection 1590 connection, client_address = sock.accept() 1591 thread = threading.Thread( 1592 target=self._handle_connection, args=(connection,) 1593 ) 1594 thread.start() 1595 1596 def __del__(self): 1597 if self._socket_path is not None: 1598 self._p.terminate() 1599 if os.path.exists(self._socket_path): 1600 # it's possible the process hasn't even created the socket yet 1601 os.remove(self._socket_path) 1602 1603 1604class UDFFrameType: 1605 """ 1606 Frame type for use in UDFs. 1607 """ 1608 1609 def __init__(self, width: int, height: int, pix_fmt: str): 1610 assert type(width) is int 1611 assert type(height) is int 1612 assert type(pix_fmt) is str 1613 1614 self._width = width 1615 self._height = height 1616 self._pix_fmt = pix_fmt 1617 1618 def width(self): 1619 return self._width 1620 1621 def height(self): 1622 return self._height 1623 1624 def pix_fmt(self): 1625 return self._pix_fmt 1626 1627 def _response_ser(self): 1628 return { 1629 "frame_type": { 1630 "width": self._width, 1631 "height": self._height, 1632 "format": 2, # AV_PIX_FMT_RGB24 1633 } 1634 } 1635 1636 def __repr__(self): 1637 return f"FrameType<{self._width}x{self._height}, {self._pix_fmt}>" 1638 1639 1640class UDFFrame: 1641 """A symbolic reference to a frame for use in UDFs.""" 1642 1643 def __init__(self, data: np.ndarray, f_type: UDFFrameType): 1644 assert type(data) is np.ndarray 1645 assert type(f_type) is UDFFrameType 1646 1647 # We only support RGB24 for now 1648 assert data.dtype == np.uint8 1649 assert data.shape[2] == 3 1650 1651 # check type matches 1652 assert data.shape[0] == f_type.height() 1653 assert data.shape[1] == f_type.width() 1654 assert f_type.pix_fmt() == "rgb24" 1655 1656 self._data = data 1657 self._f_type = f_type 1658 1659 def data(self): 1660 return self._data 1661 1662 def frame_type(self): 1663 return self._f_type 1664 1665 def _response_ser(self): 1666 return { 1667 "frame": { 1668 "data": self._data.tobytes(), 1669 "width": self._f_type.width(), 1670 "height": self._f_type.height(), 1671 "format": "rgb24", 1672 } 1673 } 1674 1675 def __repr__(self): 1676 return f"Frame<{self._f_type.width()}x{self._f_type.height()}, {self._f_type.pix_fmt()}>" 1677 1678 1679def _run_udf_host(udf: UDF, socket_path: str): 1680 udf._host(socket_path)
395class IgniSource: 396 def __init__(self, id: str, src): 397 self._name = id 398 self._fmt = { 399 "width": src["width"], 400 "height": src["height"], 401 "pix_fmt": src["pix_fmt"], 402 } 403 self._ts = [Fraction(x[0], x[1]) for x in src["ts"]] 404 self.iloc = _SourceILoc(self) 405 406 def id(self) -> str: 407 return self._name 408 409 def fmt(self): 410 return {**self._fmt} 411 412 def ts(self) -> list[Fraction]: 413 return self._ts.copy() 414 415 def __len__(self): 416 return len(self._ts) 417 418 def __getitem__(self, idx): 419 if type(idx) is not Fraction: 420 raise Exception("Source index must be a Fraction") 421 return SourceExpr(self, idx, False) 422 423 def __repr__(self): 424 return f"IgniSource({self._name})"
427class IgniSpec: 428 def __init__(self, id: str, src): 429 self._id = id 430 self._fmt = { 431 "width": src["width"], 432 "height": src["height"], 433 "pix_fmt": src["pix_fmt"], 434 } 435 self._vod_endpoint = src["vod_endpoint"] 436 parsed_url = urlparse(self._vod_endpoint) 437 self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js" 438 439 def id(self) -> str: 440 return self._id 441 442 def play(self, *args, **kwargs): 443 url = f"{self._vod_endpoint}playlist.m3u8" 444 status_url = f"{self._vod_endpoint}status" 445 hls_js_url = self._hls_js_url 446 return _play(self._id, url, hls_js_url, *args, **kwargs, status_url=status_url)
428 def __init__(self, id: str, src): 429 self._id = id 430 self._fmt = { 431 "width": src["width"], 432 "height": src["height"], 433 "pix_fmt": src["pix_fmt"], 434 } 435 self._vod_endpoint = src["vod_endpoint"] 436 parsed_url = urlparse(self._vod_endpoint) 437 self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
449class IgniServer: 450 def __init__(self, endpoint: str, api_key: str): 451 if not endpoint.startswith("http://") and not endpoint.startswith("https://"): 452 raise Exception("Endpoint must start with http:// or https://") 453 if endpoint.endswith("/"): 454 raise Exception("Endpoint must not end with /") 455 self._endpoint = endpoint 456 457 self._api_key = api_key 458 self._session = requests.Session() 459 self._session.headers.update({"Authorization": f"Bearer {self._api_key}"}) 460 response = self._session.get( 461 f"{self._endpoint}/auth", 462 headers={"Authorization": f"Bearer {self._api_key}"}, 463 ) 464 if not response.ok: 465 raise Exception(response.text) 466 response = response.json() 467 assert response["status"] == "ok" 468 469 def get_source(self, id: str) -> IgniSource: 470 assert type(id) is str 471 response = self._session.get( 472 f"{self._endpoint}/source/{id}", 473 headers={"Authorization": f"Bearer {self._api_key}"}, 474 ) 475 if not response.ok: 476 raise Exception(response.text) 477 response = response.json() 478 return IgniSource(response["id"], response) 479 480 def list_sources(self) -> list[str]: 481 response = self._session.get( 482 f"{self._endpoint}/source", 483 headers={"Authorization": f"Bearer {self._api_key}"}, 484 ) 485 if not response.ok: 486 raise Exception(response.text) 487 response = response.json() 488 return response 489 490 def delete_source(self, id: str): 491 assert type(id) is str 492 response = self._session.delete( 493 f"{self._endpoint}/source/{id}", 494 headers={"Authorization": f"Bearer {self._api_key}"}, 495 ) 496 if not response.ok: 497 raise Exception(response.text) 498 response = response.json() 499 assert response["status"] == "ok" 500 501 def search_source( 502 self, name, stream_idx, storage_service, storage_config 503 ) -> list[str]: 504 assert type(name) is str 505 assert type(stream_idx) is int 506 assert type(storage_service) is str 507 assert type(storage_config) is dict 508 for k, v in storage_config.items(): 509 assert type(k) is str 510 assert type(v) is str 511 req = { 512 "name": name, 513 "stream_idx": stream_idx, 514 "storage_service": storage_service, 515 "storage_config": storage_config, 516 } 517 response = self._session.post( 518 f"{self._endpoint}/source/search", 519 json=req, 520 headers={"Authorization": f"Bearer {self._api_key}"}, 521 ) 522 if not response.ok: 523 raise Exception(response.text) 524 response = response.json() 525 return response 526 527 def create_source( 528 self, name, stream_idx, storage_service, storage_config 529 ) -> IgniSource: 530 assert type(name) is str 531 assert type(stream_idx) is int 532 assert type(storage_service) is str 533 assert type(storage_config) is dict 534 for k, v in storage_config.items(): 535 assert type(k) is str 536 assert type(v) is str 537 req = { 538 "name": name, 539 "stream_idx": stream_idx, 540 "storage_service": storage_service, 541 "storage_config": storage_config, 542 } 543 response = self._session.post( 544 f"{self._endpoint}/source", 545 json=req, 546 headers={"Authorization": f"Bearer {self._api_key}"}, 547 ) 548 if not response.ok: 549 raise Exception(response.text) 550 response = response.json() 551 assert response["status"] == "ok" 552 id = response["id"] 553 return self.get_source(id) 554 555 def source(self, name, stream_idx, storage_service, storage_config) -> IgniSource: 556 """Convenience function for accessing sources. 557 558 Tries to find a source with the given name, stream_idx, storage_service, and storage_config. 559 If no source is found, creates a new source with the given parameters. 560 """ 561 562 sources = self.search_source(name, stream_idx, storage_service, storage_config) 563 if len(sources) == 0: 564 return self.create_source(name, stream_idx, storage_service, storage_config) 565 return self.get_source(sources[0]) 566 567 def get_spec(self, id: str) -> IgniSpec: 568 assert type(id) is str 569 response = self._session.get( 570 f"{self._endpoint}/spec/{id}", 571 headers={"Authorization": f"Bearer {self._api_key}"}, 572 ) 573 if not response.ok: 574 raise Exception(response.text) 575 response = response.json() 576 return IgniSpec(response["id"], response) 577 578 def list_specs(self) -> list[str]: 579 response = self._session.get( 580 f"{self._endpoint}/spec", 581 headers={"Authorization": f"Bearer {self._api_key}"}, 582 ) 583 if not response.ok: 584 raise Exception(response.text) 585 response = response.json() 586 return response 587 588 def create_spec( 589 self, 590 width, 591 height, 592 pix_fmt, 593 vod_segment_length, 594 frame_rate, 595 ready_hook=None, 596 steer_hook=None, 597 ttl=None, 598 ) -> IgniSpec: 599 assert type(width) is int 600 assert type(height) is int 601 assert type(pix_fmt) is str 602 assert type(vod_segment_length) is Fraction 603 assert type(frame_rate) is Fraction 604 assert type(ready_hook) is str or ready_hook is None 605 assert type(steer_hook) is str or steer_hook is None 606 assert ttl is None or type(ttl) is int 607 608 req = { 609 "width": width, 610 "height": height, 611 "pix_fmt": pix_fmt, 612 "vod_segment_length": [ 613 vod_segment_length.numerator, 614 vod_segment_length.denominator, 615 ], 616 "frame_rate": [frame_rate.numerator, frame_rate.denominator], 617 "ready_hook": ready_hook, 618 "steer_hook": steer_hook, 619 "ttl": ttl, 620 } 621 response = self._session.post( 622 f"{self._endpoint}/spec", 623 json=req, 624 headers={"Authorization": f"Bearer {self._api_key}"}, 625 ) 626 if not response.ok: 627 raise Exception(response.text) 628 response = response.json() 629 assert response["status"] == "ok" 630 return self.get_spec(response["id"]) 631 632 def delete_spec(self, id: str): 633 assert type(id) is str 634 response = self._session.delete( 635 f"{self._endpoint}/spec/{id}", 636 headers={"Authorization": f"Bearer {self._api_key}"}, 637 ) 638 if not response.ok: 639 raise Exception(response.text) 640 response = response.json() 641 assert response["status"] == "ok" 642 643 def push_spec_part(self, spec_id, pos, frames, terminal): 644 if type(spec_id) is IgniSpec: 645 spec_id = spec_id._id 646 assert type(spec_id) is str 647 assert type(pos) is int 648 assert type(frames) is list 649 assert type(terminal) is bool 650 651 req_frames = [] 652 for frame in frames: 653 assert type(frame) is tuple 654 assert len(frame) == 2 655 t = frame[0] 656 f = frame[1] 657 assert type(t) is Fraction 658 assert f is None or type(f) is SourceExpr or type(f) is FilterExpr 659 req_frames.append( 660 [ 661 [t.numerator, t.denominator], 662 f._to_json_spec() if f is not None else None, 663 ] 664 ) 665 666 req = { 667 "pos": pos, 668 "frames": req_frames, 669 "terminal": terminal, 670 } 671 response = self._session.post( 672 f"{self._endpoint}/spec/{spec_id}/part", 673 json=req, 674 headers={"Authorization": f"Bearer {self._api_key}"}, 675 ) 676 if not response.ok: 677 raise Exception(response.text) 678 response = response.json() 679 assert response["status"] == "ok" 680 681 def push_spec_part_block( 682 self, spec_id: str, pos, blocks, terminal, compression="gzip" 683 ): 684 if type(spec_id) is IgniSpec: 685 spec_id = spec_id._id 686 assert type(spec_id) is str 687 assert type(pos) is int 688 assert type(blocks) is list 689 assert type(terminal) is bool 690 assert compression is None or compression == "gzip" 691 692 req_blocks = [] 693 for block in blocks: 694 assert type(block) is _FrameExpressionBlock 695 block_body = block.as_dict() 696 block_frames = len(block_body["frame_exprs"]) 697 block_body = json.dumps(block_body).encode("utf-8") 698 if compression == "gzip": 699 block_body = gzip.compress(block_body, 1) 700 block_body = base64.b64encode(block_body).decode("utf-8") 701 req_blocks.append( 702 { 703 "frames": block_frames, 704 "compression": compression, 705 "body": block_body, 706 } 707 ) 708 709 req = { 710 "pos": pos, 711 "terminal": terminal, 712 "blocks": req_blocks, 713 } 714 response = self._session.post( 715 f"{self._endpoint}/spec/{spec_id}/part_block", 716 json=req, 717 headers={"Authorization": f"Bearer {self._api_key}"}, 718 ) 719 if not response.ok: 720 raise Exception(response.text) 721 response = response.json() 722 assert response["status"] == "ok" 723 724 def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"): 725 assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr 726 assert compression is None or compression in ["gzip"] 727 feb = _FrameExpressionBlock() 728 feb.insert_frame(frame_expr) 729 feb_body = feb.as_dict() 730 731 feb_body = json.dumps(feb_body).encode("utf-8") 732 if compression == "gzip": 733 feb_body = gzip.compress(feb_body, 1) 734 feb_body = base64.b64encode(feb_body).decode("utf-8") 735 req = { 736 "width": width, 737 "height": height, 738 "pix_fmt": pix_fmt, 739 "compression": compression, 740 "block": { 741 "frames": 1, 742 "compression": compression, 743 "body": feb_body, 744 }, 745 } 746 response = self._session.post( 747 f"{self._endpoint}/frame", 748 json=req, 749 headers={"Authorization": f"Bearer {self._api_key}"}, 750 ) 751 if not response.ok: 752 raise Exception(response.text) 753 response_body = response.content 754 assert type(response_body) is bytes 755 if compression == "gzip": 756 response_body = gzip.decompress(response_body) 757 return response_body
450 def __init__(self, endpoint: str, api_key: str): 451 if not endpoint.startswith("http://") and not endpoint.startswith("https://"): 452 raise Exception("Endpoint must start with http:// or https://") 453 if endpoint.endswith("/"): 454 raise Exception("Endpoint must not end with /") 455 self._endpoint = endpoint 456 457 self._api_key = api_key 458 self._session = requests.Session() 459 self._session.headers.update({"Authorization": f"Bearer {self._api_key}"}) 460 response = self._session.get( 461 f"{self._endpoint}/auth", 462 headers={"Authorization": f"Bearer {self._api_key}"}, 463 ) 464 if not response.ok: 465 raise Exception(response.text) 466 response = response.json() 467 assert response["status"] == "ok"
469 def get_source(self, id: str) -> IgniSource: 470 assert type(id) is str 471 response = self._session.get( 472 f"{self._endpoint}/source/{id}", 473 headers={"Authorization": f"Bearer {self._api_key}"}, 474 ) 475 if not response.ok: 476 raise Exception(response.text) 477 response = response.json() 478 return IgniSource(response["id"], response)
490 def delete_source(self, id: str): 491 assert type(id) is str 492 response = self._session.delete( 493 f"{self._endpoint}/source/{id}", 494 headers={"Authorization": f"Bearer {self._api_key}"}, 495 ) 496 if not response.ok: 497 raise Exception(response.text) 498 response = response.json() 499 assert response["status"] == "ok"
501 def search_source( 502 self, name, stream_idx, storage_service, storage_config 503 ) -> list[str]: 504 assert type(name) is str 505 assert type(stream_idx) is int 506 assert type(storage_service) is str 507 assert type(storage_config) is dict 508 for k, v in storage_config.items(): 509 assert type(k) is str 510 assert type(v) is str 511 req = { 512 "name": name, 513 "stream_idx": stream_idx, 514 "storage_service": storage_service, 515 "storage_config": storage_config, 516 } 517 response = self._session.post( 518 f"{self._endpoint}/source/search", 519 json=req, 520 headers={"Authorization": f"Bearer {self._api_key}"}, 521 ) 522 if not response.ok: 523 raise Exception(response.text) 524 response = response.json() 525 return response
527 def create_source( 528 self, name, stream_idx, storage_service, storage_config 529 ) -> IgniSource: 530 assert type(name) is str 531 assert type(stream_idx) is int 532 assert type(storage_service) is str 533 assert type(storage_config) is dict 534 for k, v in storage_config.items(): 535 assert type(k) is str 536 assert type(v) is str 537 req = { 538 "name": name, 539 "stream_idx": stream_idx, 540 "storage_service": storage_service, 541 "storage_config": storage_config, 542 } 543 response = self._session.post( 544 f"{self._endpoint}/source", 545 json=req, 546 headers={"Authorization": f"Bearer {self._api_key}"}, 547 ) 548 if not response.ok: 549 raise Exception(response.text) 550 response = response.json() 551 assert response["status"] == "ok" 552 id = response["id"] 553 return self.get_source(id)
555 def source(self, name, stream_idx, storage_service, storage_config) -> IgniSource: 556 """Convenience function for accessing sources. 557 558 Tries to find a source with the given name, stream_idx, storage_service, and storage_config. 559 If no source is found, creates a new source with the given parameters. 560 """ 561 562 sources = self.search_source(name, stream_idx, storage_service, storage_config) 563 if len(sources) == 0: 564 return self.create_source(name, stream_idx, storage_service, storage_config) 565 return self.get_source(sources[0])
Convenience function for accessing sources.
Tries to find a source with the given name, stream_idx, storage_service, and storage_config. If no source is found, creates a new source with the given parameters.
567 def get_spec(self, id: str) -> IgniSpec: 568 assert type(id) is str 569 response = self._session.get( 570 f"{self._endpoint}/spec/{id}", 571 headers={"Authorization": f"Bearer {self._api_key}"}, 572 ) 573 if not response.ok: 574 raise Exception(response.text) 575 response = response.json() 576 return IgniSpec(response["id"], response)
588 def create_spec( 589 self, 590 width, 591 height, 592 pix_fmt, 593 vod_segment_length, 594 frame_rate, 595 ready_hook=None, 596 steer_hook=None, 597 ttl=None, 598 ) -> IgniSpec: 599 assert type(width) is int 600 assert type(height) is int 601 assert type(pix_fmt) is str 602 assert type(vod_segment_length) is Fraction 603 assert type(frame_rate) is Fraction 604 assert type(ready_hook) is str or ready_hook is None 605 assert type(steer_hook) is str or steer_hook is None 606 assert ttl is None or type(ttl) is int 607 608 req = { 609 "width": width, 610 "height": height, 611 "pix_fmt": pix_fmt, 612 "vod_segment_length": [ 613 vod_segment_length.numerator, 614 vod_segment_length.denominator, 615 ], 616 "frame_rate": [frame_rate.numerator, frame_rate.denominator], 617 "ready_hook": ready_hook, 618 "steer_hook": steer_hook, 619 "ttl": ttl, 620 } 621 response = self._session.post( 622 f"{self._endpoint}/spec", 623 json=req, 624 headers={"Authorization": f"Bearer {self._api_key}"}, 625 ) 626 if not response.ok: 627 raise Exception(response.text) 628 response = response.json() 629 assert response["status"] == "ok" 630 return self.get_spec(response["id"])
632 def delete_spec(self, id: str): 633 assert type(id) is str 634 response = self._session.delete( 635 f"{self._endpoint}/spec/{id}", 636 headers={"Authorization": f"Bearer {self._api_key}"}, 637 ) 638 if not response.ok: 639 raise Exception(response.text) 640 response = response.json() 641 assert response["status"] == "ok"
643 def push_spec_part(self, spec_id, pos, frames, terminal): 644 if type(spec_id) is IgniSpec: 645 spec_id = spec_id._id 646 assert type(spec_id) is str 647 assert type(pos) is int 648 assert type(frames) is list 649 assert type(terminal) is bool 650 651 req_frames = [] 652 for frame in frames: 653 assert type(frame) is tuple 654 assert len(frame) == 2 655 t = frame[0] 656 f = frame[1] 657 assert type(t) is Fraction 658 assert f is None or type(f) is SourceExpr or type(f) is FilterExpr 659 req_frames.append( 660 [ 661 [t.numerator, t.denominator], 662 f._to_json_spec() if f is not None else None, 663 ] 664 ) 665 666 req = { 667 "pos": pos, 668 "frames": req_frames, 669 "terminal": terminal, 670 } 671 response = self._session.post( 672 f"{self._endpoint}/spec/{spec_id}/part", 673 json=req, 674 headers={"Authorization": f"Bearer {self._api_key}"}, 675 ) 676 if not response.ok: 677 raise Exception(response.text) 678 response = response.json() 679 assert response["status"] == "ok"
681 def push_spec_part_block( 682 self, spec_id: str, pos, blocks, terminal, compression="gzip" 683 ): 684 if type(spec_id) is IgniSpec: 685 spec_id = spec_id._id 686 assert type(spec_id) is str 687 assert type(pos) is int 688 assert type(blocks) is list 689 assert type(terminal) is bool 690 assert compression is None or compression == "gzip" 691 692 req_blocks = [] 693 for block in blocks: 694 assert type(block) is _FrameExpressionBlock 695 block_body = block.as_dict() 696 block_frames = len(block_body["frame_exprs"]) 697 block_body = json.dumps(block_body).encode("utf-8") 698 if compression == "gzip": 699 block_body = gzip.compress(block_body, 1) 700 block_body = base64.b64encode(block_body).decode("utf-8") 701 req_blocks.append( 702 { 703 "frames": block_frames, 704 "compression": compression, 705 "body": block_body, 706 } 707 ) 708 709 req = { 710 "pos": pos, 711 "terminal": terminal, 712 "blocks": req_blocks, 713 } 714 response = self._session.post( 715 f"{self._endpoint}/spec/{spec_id}/part_block", 716 json=req, 717 headers={"Authorization": f"Bearer {self._api_key}"}, 718 ) 719 if not response.ok: 720 raise Exception(response.text) 721 response = response.json() 722 assert response["status"] == "ok"
724 def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"): 725 assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr 726 assert compression is None or compression in ["gzip"] 727 feb = _FrameExpressionBlock() 728 feb.insert_frame(frame_expr) 729 feb_body = feb.as_dict() 730 731 feb_body = json.dumps(feb_body).encode("utf-8") 732 if compression == "gzip": 733 feb_body = gzip.compress(feb_body, 1) 734 feb_body = base64.b64encode(feb_body).decode("utf-8") 735 req = { 736 "width": width, 737 "height": height, 738 "pix_fmt": pix_fmt, 739 "compression": compression, 740 "block": { 741 "frames": 1, 742 "compression": compression, 743 "body": feb_body, 744 }, 745 } 746 response = self._session.post( 747 f"{self._endpoint}/frame", 748 json=req, 749 headers={"Authorization": f"Bearer {self._api_key}"}, 750 ) 751 if not response.ok: 752 raise Exception(response.text) 753 response_body = response.content 754 assert type(response_body) is bytes 755 if compression == "gzip": 756 response_body = gzip.decompress(response_body) 757 return response_body
760class YrdenSpec: 761 """ 762 A video transformation specification. 763 764 See https://ixlab.github.io/vidformer/concepts.html for more information. 765 """ 766 767 def __init__(self, domain: list[Fraction], render, fmt: dict): 768 self._domain = domain 769 self._render = render 770 self._fmt = fmt 771 772 def __repr__(self): 773 if len(self._domain) <= 20: 774 lines = [] 775 for i, t in enumerate(self._domain): 776 frame_expr = self._render(t, i) 777 lines.append( 778 f"{t.numerator}/{t.denominator} => {frame_expr}", 779 ) 780 return "\n".join(lines) 781 else: 782 lines = [] 783 for i, t in enumerate(self._domain[:10]): 784 frame_expr = self._render(t, i) 785 lines.append( 786 f"{t.numerator}/{t.denominator} => {frame_expr}", 787 ) 788 lines.append("...") 789 for i, t in enumerate(self._domain[-10:]): 790 frame_expr = self._render(t, i) 791 lines.append( 792 f"{t.numerator}/{t.denominator} => {frame_expr}", 793 ) 794 return "\n".join(lines) 795 796 def _sources(self): 797 s = set() 798 for i, t in enumerate(self._domain): 799 frame_expr = self._render(t, i) 800 s = s.union(frame_expr._sources()) 801 return s 802 803 def _to_json_spec(self): 804 frames = [] 805 s = set() 806 f = {} 807 for i, t in enumerate(self._domain): 808 frame_expr = self._render(t, i) 809 s = s.union(frame_expr._sources()) 810 f = {**f, **frame_expr._filters()} 811 frame = [[t.numerator, t.denominator], frame_expr._to_json_spec()] 812 frames.append(frame) 813 return {"frames": frames}, s, f 814 815 def play(self, server, method="html", verbose=False): 816 """Play the video live in the notebook.""" 817 818 spec, sources, filters = self._to_json_spec() 819 spec_json_bytes = json.dumps(spec).encode("utf-8") 820 spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1) 821 spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8") 822 823 sources = [ 824 { 825 "name": s._name, 826 "path": s._path, 827 "stream": s._stream, 828 "service": s._service.as_json() if s._service is not None else None, 829 } 830 for s in sources 831 ] 832 filters = { 833 k: { 834 "filter": v._func, 835 "args": v._kwargs, 836 } 837 for k, v in filters.items() 838 } 839 840 if verbose: 841 print(f"Sending to server. Spec is {len(spec_obj_json_gzip_b64)} bytes") 842 843 resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt) 844 hls_video_url = resp["stream_url"] 845 hls_player_url = resp["player_url"] 846 namespace = resp["namespace"] 847 hls_js_url = server.hls_js_url() 848 849 if method == "link": 850 return hls_video_url 851 if method == "player": 852 return hls_player_url 853 if method == "iframe": 854 from IPython.display import IFrame 855 856 return IFrame(hls_player_url, width=1280, height=720) 857 if method == "html": 858 from IPython.display import HTML 859 860 # We add a namespace to the video element to avoid conflicts with other videos 861 html_code = f""" 862<!DOCTYPE html> 863<html> 864<head> 865 <title>HLS Video Player</title> 866 <!-- Include hls.js library --> 867 <script src="{hls_js_url}"></script> 868</head> 869<body> 870 <!-- Video element --> 871 <video id="video-{namespace}" controls width="640" height="360" autoplay></video> 872 <script> 873 var video = document.getElementById('video-{namespace}'); 874 var videoSrc = '{hls_video_url}'; 875 var hls = new Hls(); 876 hls.loadSource(videoSrc); 877 hls.attachMedia(video); 878 hls.on(Hls.Events.MANIFEST_PARSED, function() {{ 879 video.play(); 880 }}); 881 </script> 882</body> 883</html> 884""" 885 return HTML(data=html_code) 886 else: 887 return hls_player_url 888 889 def load(self, server): 890 spec, sources, filters = self._to_json_spec() 891 spec_json_bytes = json.dumps(spec).encode("utf-8") 892 spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1) 893 spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8") 894 895 sources = [ 896 { 897 "name": s._name, 898 "path": s._path, 899 "stream": s._stream, 900 "service": s._service.as_json() if s._service is not None else None, 901 } 902 for s in sources 903 ] 904 filters = { 905 k: { 906 "filter": v._func, 907 "args": v._kwargs, 908 } 909 for k, v in filters.items() 910 } 911 resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt) 912 namespace = resp["namespace"] 913 return YrdenLoader(server, namespace, self._domain) 914 915 def save(self, server, pth, encoder=None, encoder_opts=None, format=None): 916 """Save the video to a file.""" 917 918 assert encoder is None or type(encoder) is str 919 assert encoder_opts is None or type(encoder_opts) is dict 920 if encoder_opts is not None: 921 for k, v in encoder_opts.items(): 922 assert type(k) is str and type(v) is str 923 924 spec, sources, filters = self._to_json_spec() 925 spec_json_bytes = json.dumps(spec).encode("utf-8") 926 spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1) 927 spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8") 928 929 sources = [ 930 { 931 "name": s._name, 932 "path": s._path, 933 "stream": s._stream, 934 "service": s._service.as_json() if s._service is not None else None, 935 } 936 for s in sources 937 ] 938 filters = { 939 k: { 940 "filter": v._func, 941 "args": v._kwargs, 942 } 943 for k, v in filters.items() 944 } 945 946 resp = server._export( 947 pth, 948 spec_obj_json_gzip_b64, 949 sources, 950 filters, 951 self._fmt, 952 encoder, 953 encoder_opts, 954 format, 955 ) 956 957 return resp 958 959 def _vrod_bench(self, server): 960 out = {} 961 pth = "spec.json" 962 start_t = time.time() 963 with open(pth, "w") as outfile: 964 spec, sources, filters = self._to_json_spec() 965 outfile.write(json.dumps(spec)) 966 967 sources = [ 968 { 969 "name": s._name, 970 "path": s._path, 971 "stream": s._stream, 972 "service": s._service.as_json() if s._service is not None else None, 973 } 974 for s in sources 975 ] 976 filters = { 977 k: { 978 "filter": v._func, 979 "args": v._kwargs, 980 } 981 for k, v in filters.items() 982 } 983 end_t = time.time() 984 out["vrod_create_spec"] = end_t - start_t 985 986 start = time.time() 987 resp = server._new(pth, sources, filters, self._fmt) 988 end = time.time() 989 out["vrod_register"] = end - start 990 991 stream_url = resp["stream_url"] 992 first_segment = stream_url.replace("stream.m3u8", "segment-0.ts") 993 994 start = time.time() 995 r = requests.get(first_segment) 996 r.raise_for_status() 997 end = time.time() 998 out["vrod_first_segment"] = end - start 999 return out 1000 1001 def _dve2_bench(self, server): 1002 pth = "spec.json" 1003 out = {} 1004 start_t = time.time() 1005 with open(pth, "w") as outfile: 1006 spec, sources, filters = self._to_json_spec() 1007 outfile.write(json.dumps(spec)) 1008 1009 sources = [ 1010 { 1011 "name": s._name, 1012 "path": s._path, 1013 "stream": s._stream, 1014 "service": s._service.as_json() if s._service is not None else None, 1015 } 1016 for s in sources 1017 ] 1018 filters = { 1019 k: { 1020 "filter": v._func, 1021 "args": v._kwargs, 1022 } 1023 for k, v in filters.items() 1024 } 1025 end_t = time.time() 1026 out["dve2_create_spec"] = end_t - start_t 1027 1028 start = time.time() 1029 resp = server._export(pth, sources, filters, self._fmt, None, None) 1030 resp.raise_for_status() 1031 end = time.time() 1032 out["dve2_exec"] = end - start 1033 return out
A video transformation specification.
See https://ixlab.github.io/vidformer/concepts.html for more information.
815 def play(self, server, method="html", verbose=False): 816 """Play the video live in the notebook.""" 817 818 spec, sources, filters = self._to_json_spec() 819 spec_json_bytes = json.dumps(spec).encode("utf-8") 820 spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1) 821 spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8") 822 823 sources = [ 824 { 825 "name": s._name, 826 "path": s._path, 827 "stream": s._stream, 828 "service": s._service.as_json() if s._service is not None else None, 829 } 830 for s in sources 831 ] 832 filters = { 833 k: { 834 "filter": v._func, 835 "args": v._kwargs, 836 } 837 for k, v in filters.items() 838 } 839 840 if verbose: 841 print(f"Sending to server. Spec is {len(spec_obj_json_gzip_b64)} bytes") 842 843 resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt) 844 hls_video_url = resp["stream_url"] 845 hls_player_url = resp["player_url"] 846 namespace = resp["namespace"] 847 hls_js_url = server.hls_js_url() 848 849 if method == "link": 850 return hls_video_url 851 if method == "player": 852 return hls_player_url 853 if method == "iframe": 854 from IPython.display import IFrame 855 856 return IFrame(hls_player_url, width=1280, height=720) 857 if method == "html": 858 from IPython.display import HTML 859 860 # We add a namespace to the video element to avoid conflicts with other videos 861 html_code = f""" 862<!DOCTYPE html> 863<html> 864<head> 865 <title>HLS Video Player</title> 866 <!-- Include hls.js library --> 867 <script src="{hls_js_url}"></script> 868</head> 869<body> 870 <!-- Video element --> 871 <video id="video-{namespace}" controls width="640" height="360" autoplay></video> 872 <script> 873 var video = document.getElementById('video-{namespace}'); 874 var videoSrc = '{hls_video_url}'; 875 var hls = new Hls(); 876 hls.loadSource(videoSrc); 877 hls.attachMedia(video); 878 hls.on(Hls.Events.MANIFEST_PARSED, function() {{ 879 video.play(); 880 }}); 881 </script> 882</body> 883</html> 884""" 885 return HTML(data=html_code) 886 else: 887 return hls_player_url
Play the video live in the notebook.
889 def load(self, server): 890 spec, sources, filters = self._to_json_spec() 891 spec_json_bytes = json.dumps(spec).encode("utf-8") 892 spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1) 893 spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8") 894 895 sources = [ 896 { 897 "name": s._name, 898 "path": s._path, 899 "stream": s._stream, 900 "service": s._service.as_json() if s._service is not None else None, 901 } 902 for s in sources 903 ] 904 filters = { 905 k: { 906 "filter": v._func, 907 "args": v._kwargs, 908 } 909 for k, v in filters.items() 910 } 911 resp = server._new(spec_obj_json_gzip_b64, sources, filters, self._fmt) 912 namespace = resp["namespace"] 913 return YrdenLoader(server, namespace, self._domain)
915 def save(self, server, pth, encoder=None, encoder_opts=None, format=None): 916 """Save the video to a file.""" 917 918 assert encoder is None or type(encoder) is str 919 assert encoder_opts is None or type(encoder_opts) is dict 920 if encoder_opts is not None: 921 for k, v in encoder_opts.items(): 922 assert type(k) is str and type(v) is str 923 924 spec, sources, filters = self._to_json_spec() 925 spec_json_bytes = json.dumps(spec).encode("utf-8") 926 spec_obj_json_gzip = gzip.compress(spec_json_bytes, compresslevel=1) 927 spec_obj_json_gzip_b64 = base64.b64encode(spec_obj_json_gzip).decode("utf-8") 928 929 sources = [ 930 { 931 "name": s._name, 932 "path": s._path, 933 "stream": s._stream, 934 "service": s._service.as_json() if s._service is not None else None, 935 } 936 for s in sources 937 ] 938 filters = { 939 k: { 940 "filter": v._func, 941 "args": v._kwargs, 942 } 943 for k, v in filters.items() 944 } 945 946 resp = server._export( 947 pth, 948 spec_obj_json_gzip_b64, 949 sources, 950 filters, 951 self._fmt, 952 encoder, 953 encoder_opts, 954 format, 955 ) 956 957 return resp
Save the video to a file.
1036class YrdenLoader: 1037 def __init__(self, server, namespace: str, domain): 1038 self._server = server 1039 self._namespace = namespace 1040 self._domain = domain 1041 1042 def _chunk(self, start_i, end_i): 1043 return self._server._raw(self._namespace, start_i, end_i) 1044 1045 def __len__(self): 1046 return len(self._domain) 1047 1048 def _find_index_by_rational(self, value): 1049 if value not in self._domain: 1050 raise ValueError(f"Rational timestamp {value} is not in the domain") 1051 return self._domain.index(value) 1052 1053 def __getitem__(self, index): 1054 if isinstance(index, slice): 1055 start = index.start if index.start is not None else 0 1056 end = index.stop if index.stop is not None else len(self._domain) 1057 assert start >= 0 and start < len(self._domain) 1058 assert end >= 0 and end <= len(self._domain) 1059 assert start <= end 1060 num_frames = end - start 1061 all_bytes = self._chunk(start, end - 1) 1062 all_bytes_len = len(all_bytes) 1063 assert all_bytes_len % num_frames == 0 1064 return [ 1065 all_bytes[ 1066 i 1067 * all_bytes_len 1068 // num_frames : (i + 1) 1069 * all_bytes_len 1070 // num_frames 1071 ] 1072 for i in range(num_frames) 1073 ] 1074 elif isinstance(index, int): 1075 assert index >= 0 and index < len(self._domain) 1076 return self._chunk(index, index) 1077 else: 1078 raise TypeError( 1079 "Invalid argument type for iloc. Use a slice or an integer." 1080 )
1083class YrdenServer: 1084 """ 1085 A connection to a Yrden server. 1086 1087 A yrden server is the main API for local use of vidformer. 1088 """ 1089 1090 def __init__(self, domain=None, port=None, bin=None, hls_prefix=None): 1091 """ 1092 Connect to a Yrden server 1093 1094 Can either connect to an existing server, if domain and port are provided, or start a new server using the provided binary. 1095 If no domain or binary is provided, the `VIDFORMER_BIN` environment variable is used. 1096 """ 1097 1098 self._domain = domain 1099 self._port = port 1100 self._proc = None 1101 if self._port is None: 1102 if bin is None: 1103 if os.getenv("VIDFORMER_BIN") is not None: 1104 bin = os.getenv("VIDFORMER_BIN") 1105 else: 1106 bin = "vidformer-cli" 1107 1108 self._domain = "localhost" 1109 self._port = random.randint(49152, 65535) 1110 cmd = [bin, "yrden", "--port", str(self._port)] 1111 if _in_notebook: 1112 # We need to print the URL in the notebook 1113 # This is a trick to get VS Code to forward the port 1114 cmd += ["--print-url"] 1115 1116 if hls_prefix is not None: 1117 if type(hls_prefix) is not str: 1118 raise Exception("hls_prefix must be a string") 1119 cmd += ["--hls-prefix", hls_prefix] 1120 1121 self._proc = subprocess.Popen(cmd) 1122 1123 version = _wait_for_url(f"http://{self._domain}:{self._port}/") 1124 if version is None: 1125 raise Exception("Failed to connect to server") 1126 1127 expected_version = f"vidformer-yrden v{__version__}" 1128 if version != expected_version: 1129 print( 1130 f"Warning: Expected version `{expected_version}`, got `{version}`. API may not be compatible!" 1131 ) 1132 1133 def _source(self, name: str, path: str, stream: int, service): 1134 r = requests.post( 1135 f"http://{self._domain}:{self._port}/source", 1136 json={ 1137 "name": name, 1138 "path": path, 1139 "stream": stream, 1140 "service": service.as_json() if service is not None else None, 1141 }, 1142 ) 1143 if not r.ok: 1144 raise Exception(r.text) 1145 1146 resp = r.json() 1147 resp["ts"] = [Fraction(x[0], x[1]) for x in resp["ts"]] 1148 return resp 1149 1150 def _new(self, spec, sources, filters, fmt): 1151 req = { 1152 "spec": spec, 1153 "sources": sources, 1154 "filters": filters, 1155 "width": fmt["width"], 1156 "height": fmt["height"], 1157 "pix_fmt": fmt["pix_fmt"], 1158 } 1159 1160 r = requests.post(f"http://{self._domain}:{self._port}/new", json=req) 1161 if not r.ok: 1162 raise Exception(r.text) 1163 1164 return r.json() 1165 1166 def _export(self, pth, spec, sources, filters, fmt, encoder, encoder_opts, format): 1167 req = { 1168 "spec": spec, 1169 "sources": sources, 1170 "filters": filters, 1171 "width": fmt["width"], 1172 "height": fmt["height"], 1173 "pix_fmt": fmt["pix_fmt"], 1174 "output_path": pth, 1175 "encoder": encoder, 1176 "encoder_opts": encoder_opts, 1177 "format": format, 1178 } 1179 1180 r = requests.post(f"http://{self._domain}:{self._port}/export", json=req) 1181 if not r.ok: 1182 raise Exception(r.text) 1183 1184 return r.json() 1185 1186 def _raw(self, namespace, start_i, end_i): 1187 r = requests.get( 1188 f"http://{self._domain}:{self._port}/{namespace}/raw/{start_i}-{end_i}" 1189 ) 1190 if not r.ok: 1191 raise Exception(r.text) 1192 return r.content 1193 1194 def hls_js_url(self): 1195 """Return the link to the yrden-hosted hls.js file""" 1196 return f"http://{self._domain}:{self._port}/hls.js" 1197 1198 def __del__(self): 1199 if self._proc is not None: 1200 self._proc.kill()
A connection to a Yrden server.
A yrden server is the main API for local use of vidformer.
1090 def __init__(self, domain=None, port=None, bin=None, hls_prefix=None): 1091 """ 1092 Connect to a Yrden server 1093 1094 Can either connect to an existing server, if domain and port are provided, or start a new server using the provided binary. 1095 If no domain or binary is provided, the `VIDFORMER_BIN` environment variable is used. 1096 """ 1097 1098 self._domain = domain 1099 self._port = port 1100 self._proc = None 1101 if self._port is None: 1102 if bin is None: 1103 if os.getenv("VIDFORMER_BIN") is not None: 1104 bin = os.getenv("VIDFORMER_BIN") 1105 else: 1106 bin = "vidformer-cli" 1107 1108 self._domain = "localhost" 1109 self._port = random.randint(49152, 65535) 1110 cmd = [bin, "yrden", "--port", str(self._port)] 1111 if _in_notebook: 1112 # We need to print the URL in the notebook 1113 # This is a trick to get VS Code to forward the port 1114 cmd += ["--print-url"] 1115 1116 if hls_prefix is not None: 1117 if type(hls_prefix) is not str: 1118 raise Exception("hls_prefix must be a string") 1119 cmd += ["--hls-prefix", hls_prefix] 1120 1121 self._proc = subprocess.Popen(cmd) 1122 1123 version = _wait_for_url(f"http://{self._domain}:{self._port}/") 1124 if version is None: 1125 raise Exception("Failed to connect to server") 1126 1127 expected_version = f"vidformer-yrden v{__version__}" 1128 if version != expected_version: 1129 print( 1130 f"Warning: Expected version `{expected_version}`, got `{version}`. API may not be compatible!" 1131 )
Connect to a Yrden server
Can either connect to an existing server, if domain and port are provided, or start a new server using the provided binary.
If no domain or binary is provided, the VIDFORMER_BIN
environment variable is used.
1203class YrdenSource: 1204 """A video source.""" 1205 1206 def __init__( 1207 self, server: YrdenServer, name: str, path: str, stream: int, service=None 1208 ): 1209 if service is None: 1210 # check if path is a http URL and, if so, automatically set the service 1211 # for example, the following code should work with just vf.Source(server, "tos_720p", "https://f.dominik.win/data/dve2/tos_720p.mp4") 1212 # this creates a storage service with endpoint "https://f.dominik.win/" and path "data/dve2/tos_720p.mp4" 1213 # don't use the root parameter in this case 1214 1215 match = re.match(r"(http|https)://([^/]+)(.*)", path) 1216 if match is not None: 1217 endpoint = f"{match.group(1)}://{match.group(2)}" 1218 path = match.group(3) 1219 # remove leading slash 1220 if path.startswith("/"): 1221 path = path[1:] 1222 service = YrdenStorageService("http", endpoint=endpoint) 1223 1224 self._server = server 1225 self._name = name 1226 self._path = path 1227 self._stream = stream 1228 self._service = service 1229 1230 self.iloc = _SourceILoc(self) 1231 1232 self._src = self._server._source( 1233 self._name, self._path, self._stream, self._service 1234 ) 1235 1236 def fmt(self): 1237 return { 1238 "width": self._src["width"], 1239 "height": self._src["height"], 1240 "pix_fmt": self._src["pix_fmt"], 1241 } 1242 1243 def ts(self): 1244 return self._src["ts"] 1245 1246 def __len__(self): 1247 return len(self._src["ts"]) 1248 1249 def __getitem__(self, idx): 1250 if type(idx) is not Fraction: 1251 raise Exception("Source index must be a Fraction") 1252 return SourceExpr(self, idx, False) 1253 1254 def play(self, *args, **kwargs): 1255 """Play the video live in the notebook.""" 1256 1257 domain = self.ts() 1258 1259 def render(t, _i): 1260 return self[t] 1261 1262 spec = YrdenSpec(domain, render, self.fmt()) 1263 return spec.play(*args, **kwargs)
A video source.
1206 def __init__( 1207 self, server: YrdenServer, name: str, path: str, stream: int, service=None 1208 ): 1209 if service is None: 1210 # check if path is a http URL and, if so, automatically set the service 1211 # for example, the following code should work with just vf.Source(server, "tos_720p", "https://f.dominik.win/data/dve2/tos_720p.mp4") 1212 # this creates a storage service with endpoint "https://f.dominik.win/" and path "data/dve2/tos_720p.mp4" 1213 # don't use the root parameter in this case 1214 1215 match = re.match(r"(http|https)://([^/]+)(.*)", path) 1216 if match is not None: 1217 endpoint = f"{match.group(1)}://{match.group(2)}" 1218 path = match.group(3) 1219 # remove leading slash 1220 if path.startswith("/"): 1221 path = path[1:] 1222 service = YrdenStorageService("http", endpoint=endpoint) 1223 1224 self._server = server 1225 self._name = name 1226 self._path = path 1227 self._stream = stream 1228 self._service = service 1229 1230 self.iloc = _SourceILoc(self) 1231 1232 self._src = self._server._source( 1233 self._name, self._path, self._stream, self._service 1234 )
1254 def play(self, *args, **kwargs): 1255 """Play the video live in the notebook.""" 1256 1257 domain = self.ts() 1258 1259 def render(t, _i): 1260 return self[t] 1261 1262 spec = YrdenSpec(domain, render, self.fmt()) 1263 return spec.play(*args, **kwargs)
Play the video live in the notebook.
1266class YrdenStorageService: 1267 def __init__(self, service: str, **kwargs): 1268 if type(service) is not str: 1269 raise Exception("Service name must be a string") 1270 self._service = service 1271 for k, v in kwargs.items(): 1272 if type(v) is not str: 1273 raise Exception(f"Value of {k} must be a string") 1274 self._config = kwargs 1275 1276 def as_json(self): 1277 return {"service": self._service, "config": self._config} 1278 1279 def __repr__(self): 1280 return f"{self._service}(config={self._config})"
1267 def __init__(self, service: str, **kwargs): 1268 if type(service) is not str: 1269 raise Exception("Service name must be a string") 1270 self._service = service 1271 for k, v in kwargs.items(): 1272 if type(v) is not str: 1273 raise Exception(f"Value of {k} must be a string") 1274 self._config = kwargs
1283class SourceExpr: 1284 def __init__(self, source, idx, is_iloc): 1285 self._source = source 1286 self._idx = idx 1287 self._is_iloc = is_iloc 1288 1289 def __repr__(self): 1290 if self._is_iloc: 1291 return f"{self._source._name}.iloc[{self._idx}]" 1292 else: 1293 return f"{self._source._name}[{self._idx}]" 1294 1295 def _to_json_spec(self): 1296 if self._is_iloc: 1297 return { 1298 "Source": { 1299 "video": self._source._name, 1300 "index": {"ILoc": int(self._idx)}, 1301 } 1302 } 1303 else: 1304 return { 1305 "Source": { 1306 "video": self._source._name, 1307 "index": {"T": [self._idx.numerator, self._idx.denominator]}, 1308 } 1309 } 1310 1311 def _sources(self): 1312 return set([self._source]) 1313 1314 def _filters(self): 1315 return {}
1360class Filter: 1361 """A video filter.""" 1362 1363 def __init__(self, name: str, tl_func=None, **kwargs): 1364 self._name = name 1365 1366 # tl_func is the top level func, which is the true implementation, not just a pretty name 1367 if tl_func is None: 1368 self._func = name 1369 else: 1370 self._func = tl_func 1371 1372 # filter infra args, not invocation args 1373 for k, v in kwargs.items(): 1374 if type(v) is not str: 1375 raise Exception(f"Value of {k} must be a string") 1376 self._kwargs = kwargs 1377 1378 def __call__(self, *args, **kwargs): 1379 return FilterExpr(self, args, kwargs)
A video filter.
1363 def __init__(self, name: str, tl_func=None, **kwargs): 1364 self._name = name 1365 1366 # tl_func is the top level func, which is the true implementation, not just a pretty name 1367 if tl_func is None: 1368 self._func = name 1369 else: 1370 self._func = tl_func 1371 1372 # filter infra args, not invocation args 1373 for k, v in kwargs.items(): 1374 if type(v) is not str: 1375 raise Exception(f"Value of {k} must be a string") 1376 self._kwargs = kwargs
1382class FilterExpr: 1383 def __init__(self, filter: Filter, args, kwargs): 1384 self._filter = filter 1385 self._args = args 1386 self._kwargs = kwargs 1387 1388 def __repr__(self): 1389 args = [] 1390 for arg in self._args: 1391 val = f'"{arg}"' if type(arg) is str else str(arg) 1392 args.append(str(val)) 1393 for k, v in self._kwargs.items(): 1394 val = f'"{v}"' if type(v) is str else str(v) 1395 args.append(f"{k}={val}") 1396 return f"{self._filter._name}({', '.join(args)})" 1397 1398 def _to_json_spec(self): 1399 args = [] 1400 for arg in self._args: 1401 args.append(_json_arg(arg)) 1402 kwargs = {} 1403 for k, v in self._kwargs.items(): 1404 kwargs[k] = _json_arg(v) 1405 return {"Filter": {"name": self._filter._name, "args": args, "kwargs": kwargs}} 1406 1407 def _sources(self): 1408 s = set() 1409 for arg in self._args: 1410 if type(arg) is FilterExpr or type(arg) is SourceExpr: 1411 s = s.union(arg._sources()) 1412 for arg in self._kwargs.values(): 1413 if type(arg) is FilterExpr or type(arg) is SourceExpr: 1414 s = s.union(arg._sources()) 1415 return s 1416 1417 def _filters(self): 1418 f = {self._filter._name: self._filter} 1419 for arg in self._args: 1420 if type(arg) is FilterExpr: 1421 f = {**f, **arg._filters()} 1422 for arg in self._kwargs.values(): 1423 if type(arg) is FilterExpr: 1424 f = {**f, **arg._filters()} 1425 return f
1428class UDF: 1429 """User-defined filter superclass""" 1430 1431 def __init__(self, name: str): 1432 self._name = name 1433 self._socket_path = None 1434 self._p = None 1435 1436 def filter(self, *args, **kwargs): 1437 raise Exception("User must implement the filter method") 1438 1439 def filter_type(self, *args, **kwargs): 1440 raise Exception("User must implement the filter_type method") 1441 1442 def into_filter(self): 1443 assert self._socket_path is None 1444 self._socket_path = f"/tmp/vidformer-{self._name}-{str(uuid.uuid4())}.sock" 1445 self._p = multiprocessing.Process( 1446 target=_run_udf_host, args=(self, self._socket_path) 1447 ) 1448 self._p.start() 1449 return Filter( 1450 name=self._name, tl_func="IPC", socket=self._socket_path, func=self._name 1451 ) 1452 1453 def _handle_connection(self, connection): 1454 try: 1455 while True: 1456 frame_len = connection.recv(4) 1457 if not frame_len or len(frame_len) != 4: 1458 break 1459 frame_len = int.from_bytes(frame_len, byteorder="big") 1460 data = connection.recv(frame_len) 1461 if not data: 1462 break 1463 1464 while len(data) < frame_len: 1465 new_data = connection.recv(frame_len - len(data)) 1466 if not new_data: 1467 raise Exception("Partial data received") 1468 data += new_data 1469 1470 obj = msgpack.unpackb(data, raw=False) 1471 f_op, f_args, f_kwargs = ( 1472 obj["op"], 1473 obj["args"], 1474 obj["kwargs"], 1475 ) 1476 1477 response = None 1478 if f_op == "filter": 1479 f_args = [self._deser_filter(x) for x in f_args] 1480 f_kwargs = {k: self._deser_filter(v) for k, v in f_kwargs} 1481 response = self.filter(*f_args, **f_kwargs) 1482 if type(response) is not UDFFrame: 1483 raise Exception( 1484 f"filter must return a UDFFrame, got {type(response)}" 1485 ) 1486 if response.frame_type().pix_fmt() != "rgb24": 1487 raise Exception( 1488 f"filter must return a frame with pix_fmt 'rgb24', got {response.frame_type().pix_fmt()}" 1489 ) 1490 1491 response = response._response_ser() 1492 elif f_op == "filter_type": 1493 f_args = [self._deser_filter_type(x) for x in f_args] 1494 f_kwargs = {k: self._deser_filter_type(v) for k, v in f_kwargs} 1495 response = self.filter_type(*f_args, **f_kwargs) 1496 if type(response) is not UDFFrameType: 1497 raise Exception( 1498 f"filter_type must return a UDFFrameType, got {type(response)}" 1499 ) 1500 if response.pix_fmt() != "rgb24": 1501 raise Exception( 1502 f"filter_type must return a frame with pix_fmt 'rgb24', got {response.pix_fmt()}" 1503 ) 1504 response = response._response_ser() 1505 else: 1506 raise Exception(f"Unknown operation: {f_op}") 1507 1508 response = msgpack.packb(response, use_bin_type=True) 1509 response_len = len(response).to_bytes(4, byteorder="big") 1510 connection.sendall(response_len) 1511 connection.sendall(response) 1512 finally: 1513 connection.close() 1514 1515 def _deser_filter_type(self, obj): 1516 assert type(obj) is dict 1517 keys = list(obj.keys()) 1518 assert len(keys) == 1 1519 type_key = keys[0] 1520 assert type_key in ["FrameType", "String", "Int", "Bool"] 1521 1522 if type_key == "FrameType": 1523 frame = obj[type_key] 1524 assert type(frame) is dict 1525 assert "width" in frame 1526 assert "height" in frame 1527 assert "format" in frame 1528 assert type(frame["width"]) is int 1529 assert type(frame["height"]) is int 1530 assert frame["format"] == 2 # AV_PIX_FMT_RGB24 1531 return UDFFrameType(frame["width"], frame["height"], "rgb24") 1532 elif type_key == "String": 1533 assert type(obj[type_key]) is str 1534 return obj[type_key] 1535 elif type_key == "Int": 1536 assert type(obj[type_key]) is int 1537 return obj[type_key] 1538 elif type_key == "Bool": 1539 assert type(obj[type_key]) is bool 1540 return obj[type_key] 1541 else: 1542 assert False, f"Unknown type: {type_key}" 1543 1544 def _deser_filter(self, obj): 1545 assert type(obj) is dict 1546 keys = list(obj.keys()) 1547 assert len(keys) == 1 1548 type_key = keys[0] 1549 assert type_key in ["Frame", "String", "Int", "Bool"] 1550 1551 if type_key == "Frame": 1552 frame = obj[type_key] 1553 assert type(frame) is dict 1554 assert "data" in frame 1555 assert "width" in frame 1556 assert "height" in frame 1557 assert "format" in frame 1558 assert type(frame["width"]) is int 1559 assert type(frame["height"]) is int 1560 assert frame["format"] == "rgb24" 1561 assert type(frame["data"]) is bytes 1562 1563 data = np.frombuffer(frame["data"], dtype=np.uint8) 1564 data = data.reshape(frame["height"], frame["width"], 3) 1565 return UDFFrame( 1566 data, UDFFrameType(frame["width"], frame["height"], "rgb24") 1567 ) 1568 elif type_key == "String": 1569 assert type(obj[type_key]) is str 1570 return obj[type_key] 1571 elif type_key == "Int": 1572 assert type(obj[type_key]) is int 1573 return obj[type_key] 1574 elif type_key == "Bool": 1575 assert type(obj[type_key]) is bool 1576 return obj[type_key] 1577 else: 1578 assert False, f"Unknown type: {type_key}" 1579 1580 def _host(self, socket_path: str): 1581 if os.path.exists(socket_path): 1582 os.remove(socket_path) 1583 1584 # start listening on the socket 1585 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1586 sock.bind(socket_path) 1587 sock.listen(1) 1588 1589 while True: 1590 # accept incoming connection 1591 connection, client_address = sock.accept() 1592 thread = threading.Thread( 1593 target=self._handle_connection, args=(connection,) 1594 ) 1595 thread.start() 1596 1597 def __del__(self): 1598 if self._socket_path is not None: 1599 self._p.terminate() 1600 if os.path.exists(self._socket_path): 1601 # it's possible the process hasn't even created the socket yet 1602 os.remove(self._socket_path)
User-defined filter superclass
1442 def into_filter(self): 1443 assert self._socket_path is None 1444 self._socket_path = f"/tmp/vidformer-{self._name}-{str(uuid.uuid4())}.sock" 1445 self._p = multiprocessing.Process( 1446 target=_run_udf_host, args=(self, self._socket_path) 1447 ) 1448 self._p.start() 1449 return Filter( 1450 name=self._name, tl_func="IPC", socket=self._socket_path, func=self._name 1451 )
1605class UDFFrameType: 1606 """ 1607 Frame type for use in UDFs. 1608 """ 1609 1610 def __init__(self, width: int, height: int, pix_fmt: str): 1611 assert type(width) is int 1612 assert type(height) is int 1613 assert type(pix_fmt) is str 1614 1615 self._width = width 1616 self._height = height 1617 self._pix_fmt = pix_fmt 1618 1619 def width(self): 1620 return self._width 1621 1622 def height(self): 1623 return self._height 1624 1625 def pix_fmt(self): 1626 return self._pix_fmt 1627 1628 def _response_ser(self): 1629 return { 1630 "frame_type": { 1631 "width": self._width, 1632 "height": self._height, 1633 "format": 2, # AV_PIX_FMT_RGB24 1634 } 1635 } 1636 1637 def __repr__(self): 1638 return f"FrameType<{self._width}x{self._height}, {self._pix_fmt}>"
Frame type for use in UDFs.
1641class UDFFrame: 1642 """A symbolic reference to a frame for use in UDFs.""" 1643 1644 def __init__(self, data: np.ndarray, f_type: UDFFrameType): 1645 assert type(data) is np.ndarray 1646 assert type(f_type) is UDFFrameType 1647 1648 # We only support RGB24 for now 1649 assert data.dtype == np.uint8 1650 assert data.shape[2] == 3 1651 1652 # check type matches 1653 assert data.shape[0] == f_type.height() 1654 assert data.shape[1] == f_type.width() 1655 assert f_type.pix_fmt() == "rgb24" 1656 1657 self._data = data 1658 self._f_type = f_type 1659 1660 def data(self): 1661 return self._data 1662 1663 def frame_type(self): 1664 return self._f_type 1665 1666 def _response_ser(self): 1667 return { 1668 "frame": { 1669 "data": self._data.tobytes(), 1670 "width": self._f_type.width(), 1671 "height": self._f_type.height(), 1672 "format": "rgb24", 1673 } 1674 } 1675 1676 def __repr__(self): 1677 return f"Frame<{self._f_type.width()}x{self._f_type.height()}, {self._f_type.pix_fmt()}>"
A symbolic reference to a frame for use in UDFs.
1644 def __init__(self, data: np.ndarray, f_type: UDFFrameType): 1645 assert type(data) is np.ndarray 1646 assert type(f_type) is UDFFrameType 1647 1648 # We only support RGB24 for now 1649 assert data.dtype == np.uint8 1650 assert data.shape[2] == 3 1651 1652 # check type matches 1653 assert data.shape[0] == f_type.height() 1654 assert data.shape[1] == f_type.width() 1655 assert f_type.pix_fmt() == "rgb24" 1656 1657 self._data = data 1658 self._f_type = f_type