vidformer
vidformer-py is a Python 🐍 interface for vidformer.
Quick links:
1""" 2vidformer-py is a Python 🐍 interface for [vidformer](https://github.com/ixlab/vidformer). 3 4**Quick links:** 5* [📦 PyPI](https://pypi.org/project/vidformer/) 6* [📘 Documentation - vidformer-py](https://ixlab.github.io/vidformer/vidformer-py/pdoc/) 7* [📘 Documentation - vidformer.cv2](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/cv2.html) 8* [📘 Documentation - vidformer.supervision](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/supervision.html) 9* [🧑💻 Source Code](https://github.com/ixlab/vidformer/tree/main/vidformer-py/) 10""" 11 12__version__ = "1.0.1" 13 14 15import base64 16import gzip 17import json 18import struct 19import time 20from fractions import Fraction 21from urllib.parse import urlparse 22 23import requests 24 25_in_notebook = False 26try: 27 from IPython import get_ipython 28 29 if "IPKernelApp" in get_ipython().config: 30 _in_notebook = True 31except Exception: 32 pass 33 34 35def _wait_for_url(url, max_attempts=150, delay=0.1): 36 for attempt in range(max_attempts): 37 try: 38 response = requests.get(url) 39 if response.status_code == 200: 40 return response.text.strip() 41 else: 42 time.sleep(delay) 43 except requests.exceptions.RequestException: 44 time.sleep(delay) 45 return None 46 47 48def _play(namespace, hls_video_url, hls_js_url, method="html", status_url=None): 49 # The namespace is so multiple videos in one tab don't conflict 50 51 if method == "html": 52 from IPython.display import HTML 53 54 if not status_url: 55 html_code = f""" 56<!DOCTYPE html> 57<html> 58<head> 59 <title>HLS Video Player</title> 60 <!-- Include hls.js library --> 61 <script src="{hls_js_url}"></script> 62</head> 63<body> 64 <video id="video-{namespace}" controls width="640" height="360" autoplay></video> 65 <script> 66 var video = document.getElementById('video-{namespace}'); 67 var videoSrc = '{hls_video_url}'; 68 69 if (Hls.isSupported()) {{ 70 var hls = new Hls(); 71 hls.loadSource(videoSrc); 72 hls.attachMedia(video); 73 hls.on(Hls.Events.MANIFEST_PARSED, function() {{ 74 video.play(); 75 }}); 76 }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{ 77 video.src = videoSrc; 78 video.addEventListener('loadedmetadata', function() {{ 79 video.play(); 80 }}); 81 }} else {{ 82 console.error('This browser does not appear to support HLS.'); 83 }} 84 </script> 85</body> 86</html> 87""" 88 return HTML(data=html_code) 89 else: 90 html_code = f""" 91<!DOCTYPE html> 92<html> 93<head> 94 <title>HLS Video Player</title> 95 <script src="{hls_js_url}"></script> 96</head> 97<body> 98 <div id="container-{namespace}"></div> 99 <script> 100 var statusUrl = '{status_url}'; 101 var videoSrc = '{hls_video_url}'; 102 var videoNamespace = '{namespace}'; 103 104 function showWaiting() {{ 105 document.getElementById('container-{namespace}').textContent = 'Waiting...'; 106 pollStatus(); 107 }} 108 109 function pollStatus() {{ 110 setTimeout(function() {{ 111 fetch(statusUrl) 112 .then(r => r.json()) 113 .then(res => {{ 114 if (res.ready) {{ 115 document.getElementById('container-{namespace}').textContent = ''; 116 attachHls(); 117 }} else {{ 118 pollStatus(); 119 }} 120 }}) 121 .catch(e => {{ 122 console.error(e); 123 pollStatus(); 124 }}); 125 }}, 250); 126 }} 127 128 function attachHls() {{ 129 var container = document.getElementById('container-{namespace}'); 130 container.textContent = ''; 131 var video = document.createElement('video'); 132 video.id = 'video-' + videoNamespace; 133 video.controls = true; 134 video.width = 640; 135 video.height = 360; 136 container.appendChild(video); 137 if (Hls.isSupported()) {{ 138 var hls = new Hls(); 139 hls.loadSource(videoSrc); 140 hls.attachMedia(video); 141 hls.on(Hls.Events.MANIFEST_PARSED, function() {{ 142 video.play(); 143 }}); 144 }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{ 145 video.src = videoSrc; 146 video.addEventListener('loadedmetadata', function() {{ 147 video.play(); 148 }}); 149 }} 150 }} 151 152 fetch(statusUrl) 153 .then(r => r.json()) 154 .then(res => {{ 155 if (res.ready) {{ 156 attachHls(); 157 }} else {{ 158 showWaiting(); 159 }} 160 }}) 161 .catch(e => {{ 162 console.error(e); 163 showWaiting(); 164 }}); 165 </script> 166</body> 167</html> 168""" 169 return HTML(data=html_code) 170 elif method == "link": 171 return hls_video_url 172 else: 173 raise ValueError("Invalid method") 174 175 176def _feb_expr_coded_as_scalar(expr) -> bool: 177 if type(expr) is tuple: 178 expr = list(expr) 179 if type(expr) is FilterExpr: 180 return False 181 if type(expr) is list: 182 if len(expr) > 3: 183 return False 184 else: 185 return all([type(x) is int and x >= -(2**15) and x < 2**15 for x in expr]) 186 else: 187 assert type(expr) in [int, float, str, bytes, SourceExpr, bool, list] 188 return True 189 190 191class _FrameExpressionBlock: 192 def __init__(self): 193 self._functions = [] 194 self._literals = [] 195 self._sources = [] 196 self._kwarg_keys = [] 197 self._source_fracs = [] 198 self._exprs = [] 199 self._frame_exprs = [] 200 201 def __len__(self): 202 return len(self._frame_exprs) 203 204 def insert_expr(self, expr): 205 if type(expr) is SourceExpr or type(expr) is FilterExpr: 206 return self.insert_frame_expr(expr) 207 else: 208 return self.insert_data_expr(expr) 209 210 def insert_data_expr(self, data): 211 if type(data) is tuple: 212 data = list(data) 213 if type(data) is bool: 214 self._exprs.append(0x01000000_00000000 | int(data)) 215 return len(self._exprs) - 1 216 elif type(data) is int: 217 if data >= -(2**31) and data < 2**31: 218 self._exprs.append(data & 0xFFFFFFFF) 219 else: 220 self._literals.append(_json_arg(data, skip_data_anot=True)) 221 self._exprs.append(0x40000000_00000000 | len(self._literals) - 1) 222 return len(self._exprs) - 1 223 elif type(data) is float: 224 self._exprs.append( 225 0x02000000_00000000 | int.from_bytes(struct.pack("f", data)[::-1]) 226 ) 227 elif type(data) is str: 228 self._literals.append(_json_arg(data, skip_data_anot=True)) 229 self._exprs.append(0x40000000_00000000 | len(self._literals) - 1) 230 elif type(data) is bytes: 231 self._literals.append(_json_arg(data, skip_data_anot=True)) 232 self._exprs.append(0x40000000_00000000 | len(self._literals) - 1) 233 elif type(data) is list: 234 if len(data) == 0: 235 self._exprs.append(0x03000000_00000000) 236 return len(self._exprs) - 1 237 if ( 238 len(data) == 1 239 and type(data[0]) is int 240 and data[0] >= -(2**15) 241 and data[0] < 2**15 242 ): 243 self._exprs.append(0x04000000_00000000 | (data[0] & 0xFFFF)) 244 return len(self._exprs) - 1 245 if ( 246 len(data) == 2 247 and type(data[0]) is int 248 and data[0] >= -(2**15) 249 and data[0] < 2**15 250 and type(data[1]) is int 251 and data[1] >= -(2**15) 252 and data[1] < 2**15 253 ): 254 self._exprs.append( 255 0x05000000_00000000 256 | ((data[0] & 0xFFFF) << 16) 257 | (data[1] & 0xFFFF) 258 ) 259 return len(self._exprs) - 1 260 if ( 261 len(data) == 3 262 and type(data[0]) is int 263 and data[0] >= -(2**15) 264 and data[0] < 2**15 265 and type(data[1]) is int 266 and data[1] >= -(2**15) 267 and data[1] < 2**15 268 and type(data[2]) is int 269 and data[2] >= -(2**15) 270 and data[2] < 2**15 271 ): 272 self._exprs.append( 273 0x06000000_00000000 274 | ((data[0] & 0xFFFF) << 32) 275 | ((data[1] & 0xFFFF) << 16) 276 | (data[2] & 0xFFFF) 277 ) 278 return len(self._exprs) - 1 279 out = len(self._exprs) 280 member_idxs = [] 281 for member in data: 282 if _feb_expr_coded_as_scalar(member): 283 member_idxs.append(None) 284 else: 285 member_idxs.append(self.insert_data_expr(member)) 286 287 self._exprs.append(0x42000000_00000000 | len(data)) 288 289 for i in range(len(data)): 290 if member_idxs[i] is None: 291 self.insert_data_expr(data[i]) 292 else: 293 self._exprs.append(0x45000000_00000000 | member_idxs[i]) 294 295 return out 296 else: 297 raise Exception("Invalid data type") 298 299 def insert_frame_expr(self, frame): 300 if type(frame) is SourceExpr: 301 source = frame._source._name 302 if source in self._sources: 303 source_idx = self._sources.index(source) 304 else: 305 source_idx = len(self._sources) 306 self._sources.append(source) 307 if frame._is_iloc: 308 self._exprs.append( 309 0x43000000_00000000 | (source_idx << 32) | frame._idx 310 ) 311 else: 312 idx = len(self._source_fracs) // 2 313 self._source_fracs.append(frame._idx.numerator) 314 self._source_fracs.append(frame._idx.denominator) 315 self._exprs.append(0x44000000_00000000 | (source_idx << 32) | idx) 316 return len(self._exprs) - 1 317 elif type(frame) is FilterExpr: 318 func = frame._filter._func 319 if func in self._functions: 320 func_idx = self._functions.index(func) 321 else: 322 func_idx = len(self._functions) 323 self._functions.append(func) 324 len_args = len(frame._args) 325 len_kwargs = len(frame._kwargs) 326 327 arg_idxs = [] 328 for arg in frame._args: 329 if _feb_expr_coded_as_scalar(arg): 330 arg_idxs.append(None) 331 else: 332 arg_idxs.append(self.insert_expr(arg)) 333 kwarg_idxs = {} 334 for k, v in frame._kwargs.items(): 335 if _feb_expr_coded_as_scalar(v): 336 kwarg_idxs[k] = None 337 else: 338 kwarg_idxs[k] = self.insert_expr(v) 339 340 out_idx = len(self._exprs) 341 self._exprs.append( 342 0x41000000_00000000 | (len_args << 24) | (len_kwargs << 16) | func_idx 343 ) 344 for i in range(len_args): 345 if arg_idxs[i] is None: 346 # It's a scalar 347 self.insert_expr(frame._args[i]) 348 else: 349 # It's an expression pointer 350 self._exprs.append(0x45000000_00000000 | arg_idxs[i]) 351 for k, v in frame._kwargs.items(): 352 if k in self._kwarg_keys: 353 k_idx = self._kwarg_keys.index(k) 354 else: 355 k_idx = len(self._kwarg_keys) 356 self._kwarg_keys.append(k) 357 self._exprs.append(0x46000000_00000000 | k_idx) 358 if kwarg_idxs[k] is None: 359 # It's a scalar 360 self.insert_expr(v) 361 else: 362 # It's an expression pointer 363 self._exprs.append(0x45000000_00000000 | kwarg_idxs[k]) 364 return out_idx 365 else: 366 raise Exception("Invalid frame type") 367 368 def insert_frame(self, frame): 369 idx = self.insert_frame_expr(frame) 370 self._frame_exprs.append(idx) 371 372 def as_dict(self): 373 return { 374 "functions": self._functions, 375 "literals": self._literals, 376 "sources": self._sources, 377 "kwarg_keys": self._kwarg_keys, 378 "source_fracs": self._source_fracs, 379 "exprs": self._exprs, 380 "frame_exprs": self._frame_exprs, 381 } 382 383 384class Source: 385 def __init__(self, id: str, src): 386 self._name = id 387 self._fmt = { 388 "width": src["width"], 389 "height": src["height"], 390 "pix_fmt": src["pix_fmt"], 391 } 392 self._ts = [Fraction(x[0], x[1]) for x in src["ts"]] 393 self.iloc = _SourceILoc(self) 394 395 def id(self) -> str: 396 return self._name 397 398 def fmt(self): 399 return {**self._fmt} 400 401 def ts(self) -> list[Fraction]: 402 return self._ts.copy() 403 404 def __len__(self): 405 return len(self._ts) 406 407 def __getitem__(self, idx): 408 if type(idx) is not Fraction: 409 raise Exception("Source index must be a Fraction") 410 return SourceExpr(self, idx, False) 411 412 def __repr__(self): 413 return f"Source({self._name})" 414 415 416class Spec: 417 def __init__(self, id: str, src): 418 self._id = id 419 self._fmt = { 420 "width": src["width"], 421 "height": src["height"], 422 "pix_fmt": src["pix_fmt"], 423 } 424 self._vod_endpoint = src["vod_endpoint"] 425 parsed_url = urlparse(self._vod_endpoint) 426 self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js" 427 428 def id(self) -> str: 429 return self._id 430 431 def play(self, method): 432 url = f"{self._vod_endpoint}playlist.m3u8" 433 status_url = f"{self._vod_endpoint}status" 434 hls_js_url = self._hls_js_url 435 return _play(self._id, url, hls_js_url, method=method, status_url=status_url) 436 437 438class Server: 439 def __init__(self, endpoint: str, api_key: str): 440 if not endpoint.startswith("http://") and not endpoint.startswith("https://"): 441 raise Exception("Endpoint must start with http:// or https://") 442 if endpoint.endswith("/"): 443 raise Exception("Endpoint must not end with /") 444 self._endpoint = endpoint 445 446 self._api_key = api_key 447 self._session = requests.Session() 448 self._session.headers.update({"Authorization": f"Bearer {self._api_key}"}) 449 response = self._session.get( 450 f"{self._endpoint}/v2/auth", 451 headers={"Authorization": f"Bearer {self._api_key}"}, 452 ) 453 if not response.ok: 454 raise Exception(response.text) 455 response = response.json() 456 assert response["status"] == "ok" 457 458 def get_source(self, id: str) -> Source: 459 assert type(id) is str 460 response = self._session.get( 461 f"{self._endpoint}/v2/source/{id}", 462 headers={"Authorization": f"Bearer {self._api_key}"}, 463 ) 464 if not response.ok: 465 raise Exception(response.text) 466 response = response.json() 467 return Source(response["id"], response) 468 469 def list_sources(self) -> list[str]: 470 response = self._session.get( 471 f"{self._endpoint}/v2/source", 472 headers={"Authorization": f"Bearer {self._api_key}"}, 473 ) 474 if not response.ok: 475 raise Exception(response.text) 476 response = response.json() 477 return response 478 479 def delete_source(self, id: str): 480 assert type(id) is str 481 response = self._session.delete( 482 f"{self._endpoint}/v2/source/{id}", 483 headers={"Authorization": f"Bearer {self._api_key}"}, 484 ) 485 if not response.ok: 486 raise Exception(response.text) 487 response = response.json() 488 assert response["status"] == "ok" 489 490 def search_source( 491 self, name, stream_idx, storage_service, storage_config 492 ) -> list[str]: 493 assert type(name) is str 494 assert type(stream_idx) is int 495 assert type(storage_service) is str 496 assert type(storage_config) is dict 497 for k, v in storage_config.items(): 498 assert type(k) is str 499 assert type(v) is str 500 req = { 501 "name": name, 502 "stream_idx": stream_idx, 503 "storage_service": storage_service, 504 "storage_config": storage_config, 505 } 506 response = self._session.post( 507 f"{self._endpoint}/v2/source/search", 508 json=req, 509 headers={"Authorization": f"Bearer {self._api_key}"}, 510 ) 511 if not response.ok: 512 raise Exception(response.text) 513 response = response.json() 514 return response 515 516 def create_source( 517 self, name, stream_idx, storage_service, storage_config 518 ) -> Source: 519 assert type(name) is str 520 assert type(stream_idx) is int 521 assert type(storage_service) is str 522 assert type(storage_config) is dict 523 for k, v in storage_config.items(): 524 assert type(k) is str 525 assert type(v) is str 526 req = { 527 "name": name, 528 "stream_idx": stream_idx, 529 "storage_service": storage_service, 530 "storage_config": storage_config, 531 } 532 response = self._session.post( 533 f"{self._endpoint}/v2/source", 534 json=req, 535 headers={"Authorization": f"Bearer {self._api_key}"}, 536 ) 537 if not response.ok: 538 raise Exception(response.text) 539 response = response.json() 540 assert response["status"] == "ok" 541 id = response["id"] 542 return self.get_source(id) 543 544 def source(self, name, stream_idx, storage_service, storage_config) -> Source: 545 """Convenience function for accessing sources. 546 547 Tries to find a source with the given name, stream_idx, storage_service, and storage_config. 548 If no source is found, creates a new source with the given parameters. 549 """ 550 551 sources = self.search_source(name, stream_idx, storage_service, storage_config) 552 if len(sources) == 0: 553 return self.create_source(name, stream_idx, storage_service, storage_config) 554 return self.get_source(sources[0]) 555 556 def get_spec(self, id: str) -> Spec: 557 assert type(id) is str 558 response = self._session.get( 559 f"{self._endpoint}/v2/spec/{id}", 560 headers={"Authorization": f"Bearer {self._api_key}"}, 561 ) 562 if not response.ok: 563 raise Exception(response.text) 564 response = response.json() 565 return Spec(response["id"], response) 566 567 def list_specs(self) -> list[str]: 568 response = self._session.get( 569 f"{self._endpoint}/v2/spec", 570 headers={"Authorization": f"Bearer {self._api_key}"}, 571 ) 572 if not response.ok: 573 raise Exception(response.text) 574 response = response.json() 575 return response 576 577 def create_spec( 578 self, 579 width, 580 height, 581 pix_fmt, 582 vod_segment_length, 583 frame_rate, 584 ready_hook=None, 585 steer_hook=None, 586 ttl=None, 587 ) -> Spec: 588 assert type(width) is int 589 assert type(height) is int 590 assert type(pix_fmt) is str 591 assert type(vod_segment_length) is Fraction 592 assert type(frame_rate) is Fraction 593 assert type(ready_hook) is str or ready_hook is None 594 assert type(steer_hook) is str or steer_hook is None 595 assert ttl is None or type(ttl) is int 596 597 req = { 598 "width": width, 599 "height": height, 600 "pix_fmt": pix_fmt, 601 "vod_segment_length": [ 602 vod_segment_length.numerator, 603 vod_segment_length.denominator, 604 ], 605 "frame_rate": [frame_rate.numerator, frame_rate.denominator], 606 "ready_hook": ready_hook, 607 "steer_hook": steer_hook, 608 "ttl": ttl, 609 } 610 response = self._session.post( 611 f"{self._endpoint}/v2/spec", 612 json=req, 613 headers={"Authorization": f"Bearer {self._api_key}"}, 614 ) 615 if not response.ok: 616 raise Exception(response.text) 617 response = response.json() 618 assert response["status"] == "ok" 619 return self.get_spec(response["id"]) 620 621 def delete_spec(self, id: str): 622 assert type(id) is str 623 response = self._session.delete( 624 f"{self._endpoint}/v2/spec/{id}", 625 headers={"Authorization": f"Bearer {self._api_key}"}, 626 ) 627 if not response.ok: 628 raise Exception(response.text) 629 response = response.json() 630 assert response["status"] == "ok" 631 632 def export_spec( 633 self, id: str, path: str, encoder=None, encoder_opts=None, format=None 634 ): 635 assert type(id) is str 636 assert type(path) is str 637 req = { 638 "path": path, 639 "encoder": encoder, 640 "encoder_opts": encoder_opts, 641 "format": format, 642 } 643 response = self._session.post( 644 f"{self._endpoint}/v2/spec/{id}/export", 645 json=req, 646 headers={"Authorization": f"Bearer {self._api_key}"}, 647 ) 648 if not response.ok: 649 raise Exception(response.text) 650 response = response.json() 651 assert response["status"] == "ok" 652 653 def push_spec_part(self, spec_id, pos, frames, terminal): 654 if type(spec_id) is Spec: 655 spec_id = spec_id._id 656 assert type(spec_id) is str 657 assert type(pos) is int 658 assert type(frames) is list 659 assert type(terminal) is bool 660 661 req_frames = [] 662 for frame in frames: 663 assert type(frame) is tuple 664 assert len(frame) == 2 665 t = frame[0] 666 f = frame[1] 667 assert type(t) is Fraction 668 assert f is None or type(f) is SourceExpr or type(f) is FilterExpr 669 req_frames.append( 670 [ 671 [t.numerator, t.denominator], 672 f._to_json_spec() if f is not None else None, 673 ] 674 ) 675 676 req = { 677 "pos": pos, 678 "frames": req_frames, 679 "terminal": terminal, 680 } 681 response = self._session.post( 682 f"{self._endpoint}/v2/spec/{spec_id}/part", 683 json=req, 684 headers={"Authorization": f"Bearer {self._api_key}"}, 685 ) 686 if not response.ok: 687 raise Exception(response.text) 688 response = response.json() 689 assert response["status"] == "ok" 690 691 def push_spec_part_block( 692 self, spec_id: str, pos, blocks, terminal, compression="gzip" 693 ): 694 if type(spec_id) is Spec: 695 spec_id = spec_id._id 696 assert type(spec_id) is str 697 assert type(pos) is int 698 assert type(blocks) is list 699 assert type(terminal) is bool 700 assert compression is None or compression == "gzip" 701 702 req_blocks = [] 703 for block in blocks: 704 assert type(block) is _FrameExpressionBlock 705 block_body = block.as_dict() 706 block_frames = len(block_body["frame_exprs"]) 707 block_body = json.dumps(block_body).encode("utf-8") 708 if compression == "gzip": 709 block_body = gzip.compress(block_body, 1) 710 block_body = base64.b64encode(block_body).decode("utf-8") 711 req_blocks.append( 712 { 713 "frames": block_frames, 714 "compression": compression, 715 "body": block_body, 716 } 717 ) 718 719 req = { 720 "pos": pos, 721 "terminal": terminal, 722 "blocks": req_blocks, 723 } 724 response = self._session.post( 725 f"{self._endpoint}/v2/spec/{spec_id}/part_block", 726 json=req, 727 headers={"Authorization": f"Bearer {self._api_key}"}, 728 ) 729 if not response.ok: 730 raise Exception(response.text) 731 response = response.json() 732 assert response["status"] == "ok" 733 734 def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"): 735 assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr 736 assert compression is None or compression in ["gzip"] 737 feb = _FrameExpressionBlock() 738 feb.insert_frame(frame_expr) 739 feb_body = feb.as_dict() 740 741 feb_body = json.dumps(feb_body).encode("utf-8") 742 if compression == "gzip": 743 feb_body = gzip.compress(feb_body, 1) 744 feb_body = base64.b64encode(feb_body).decode("utf-8") 745 req = { 746 "width": width, 747 "height": height, 748 "pix_fmt": pix_fmt, 749 "compression": compression, 750 "block": { 751 "frames": 1, 752 "compression": compression, 753 "body": feb_body, 754 }, 755 } 756 response = self._session.post( 757 f"{self._endpoint}/v2/frame", 758 json=req, 759 headers={"Authorization": f"Bearer {self._api_key}"}, 760 ) 761 if not response.ok: 762 raise Exception(response.text) 763 response_body = response.content 764 assert type(response_body) is bytes 765 if compression == "gzip": 766 response_body = gzip.decompress(response_body) 767 return response_body 768 769 770class SourceExpr: 771 def __init__(self, source, idx, is_iloc): 772 self._source = source 773 self._idx = idx 774 self._is_iloc = is_iloc 775 776 def __repr__(self): 777 if self._is_iloc: 778 return f"{self._source._name}.iloc[{self._idx}]" 779 else: 780 return f"{self._source._name}[{self._idx}]" 781 782 def _to_json_spec(self): 783 if self._is_iloc: 784 return { 785 "Source": { 786 "video": self._source._name, 787 "index": {"ILoc": int(self._idx)}, 788 } 789 } 790 else: 791 return { 792 "Source": { 793 "video": self._source._name, 794 "index": {"T": [self._idx.numerator, self._idx.denominator]}, 795 } 796 } 797 798 def _sources(self): 799 return set([self._source]) 800 801 def _filters(self): 802 return {} 803 804 805class _SourceILoc: 806 def __init__(self, source): 807 self._source = source 808 809 def __getitem__(self, idx): 810 if type(idx) is not int: 811 raise Exception(f"Source iloc index must be an integer, got a {type(idx)}") 812 return SourceExpr(self._source, idx, True) 813 814 815def _json_arg(arg, skip_data_anot=False): 816 if type(arg) is FilterExpr or type(arg) is SourceExpr: 817 return {"Frame": arg._to_json_spec()} 818 elif type(arg) is int: 819 if skip_data_anot: 820 return {"Int": arg} 821 return {"Data": {"Int": arg}} 822 elif type(arg) is str: 823 if skip_data_anot: 824 return {"String": arg} 825 return {"Data": {"String": arg}} 826 elif type(arg) is bytes: 827 arg = list(arg) 828 if skip_data_anot: 829 return {"Bytes": arg} 830 return {"Data": {"Bytes": arg}} 831 elif type(arg) is float: 832 if skip_data_anot: 833 return {"Float": arg} 834 return {"Data": {"Float": arg}} 835 elif type(arg) is bool: 836 if skip_data_anot: 837 return {"Bool": arg} 838 return {"Data": {"Bool": arg}} 839 elif type(arg) is tuple or type(arg) is list: 840 if skip_data_anot: 841 return {"List": [_json_arg(x, True) for x in list(arg)]} 842 return {"Data": {"List": [_json_arg(x, True) for x in list(arg)]}} 843 else: 844 raise Exception(f"Unknown arg type: {type(arg)}") 845 846 847class Filter: 848 """A video filter.""" 849 850 def __init__(self, func: str): 851 self._func = func 852 853 def __call__(self, *args, **kwargs): 854 return FilterExpr(self, args, kwargs) 855 856 857class FilterExpr: 858 def __init__(self, filter: Filter, args, kwargs): 859 self._filter = filter 860 self._args = args 861 self._kwargs = kwargs 862 863 def __repr__(self): 864 args = [] 865 for arg in self._args: 866 val = f'"{arg}"' if type(arg) is str else str(arg) 867 args.append(str(val)) 868 for k, v in self._kwargs.items(): 869 val = f'"{v}"' if type(v) is str else str(v) 870 args.append(f"{k}={val}") 871 return f"{self._filter._name}({', '.join(args)})" 872 873 def _to_json_spec(self): 874 args = [] 875 for arg in self._args: 876 args.append(_json_arg(arg)) 877 kwargs = {} 878 for k, v in self._kwargs.items(): 879 kwargs[k] = _json_arg(v) 880 return {"Filter": {"name": self._filter._name, "args": args, "kwargs": kwargs}} 881 882 def _sources(self): 883 s = set() 884 for arg in self._args: 885 if type(arg) is FilterExpr or type(arg) is SourceExpr: 886 s = s.union(arg._sources()) 887 for arg in self._kwargs.values(): 888 if type(arg) is FilterExpr or type(arg) is SourceExpr: 889 s = s.union(arg._sources()) 890 return s 891 892 def _filters(self): 893 f = {self._filter._name: self._filter} 894 for arg in self._args: 895 if type(arg) is FilterExpr: 896 f = {**f, **arg._filters()} 897 for arg in self._kwargs.values(): 898 if type(arg) is FilterExpr: 899 f = {**f, **arg._filters()} 900 return f
class
Source:
385class Source: 386 def __init__(self, id: str, src): 387 self._name = id 388 self._fmt = { 389 "width": src["width"], 390 "height": src["height"], 391 "pix_fmt": src["pix_fmt"], 392 } 393 self._ts = [Fraction(x[0], x[1]) for x in src["ts"]] 394 self.iloc = _SourceILoc(self) 395 396 def id(self) -> str: 397 return self._name 398 399 def fmt(self): 400 return {**self._fmt} 401 402 def ts(self) -> list[Fraction]: 403 return self._ts.copy() 404 405 def __len__(self): 406 return len(self._ts) 407 408 def __getitem__(self, idx): 409 if type(idx) is not Fraction: 410 raise Exception("Source index must be a Fraction") 411 return SourceExpr(self, idx, False) 412 413 def __repr__(self): 414 return f"Source({self._name})"
class
Spec:
417class Spec: 418 def __init__(self, id: str, src): 419 self._id = id 420 self._fmt = { 421 "width": src["width"], 422 "height": src["height"], 423 "pix_fmt": src["pix_fmt"], 424 } 425 self._vod_endpoint = src["vod_endpoint"] 426 parsed_url = urlparse(self._vod_endpoint) 427 self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js" 428 429 def id(self) -> str: 430 return self._id 431 432 def play(self, method): 433 url = f"{self._vod_endpoint}playlist.m3u8" 434 status_url = f"{self._vod_endpoint}status" 435 hls_js_url = self._hls_js_url 436 return _play(self._id, url, hls_js_url, method=method, status_url=status_url)
Spec(id: str, src)
418 def __init__(self, id: str, src): 419 self._id = id 420 self._fmt = { 421 "width": src["width"], 422 "height": src["height"], 423 "pix_fmt": src["pix_fmt"], 424 } 425 self._vod_endpoint = src["vod_endpoint"] 426 parsed_url = urlparse(self._vod_endpoint) 427 self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
class
Server:
439class Server: 440 def __init__(self, endpoint: str, api_key: str): 441 if not endpoint.startswith("http://") and not endpoint.startswith("https://"): 442 raise Exception("Endpoint must start with http:// or https://") 443 if endpoint.endswith("/"): 444 raise Exception("Endpoint must not end with /") 445 self._endpoint = endpoint 446 447 self._api_key = api_key 448 self._session = requests.Session() 449 self._session.headers.update({"Authorization": f"Bearer {self._api_key}"}) 450 response = self._session.get( 451 f"{self._endpoint}/v2/auth", 452 headers={"Authorization": f"Bearer {self._api_key}"}, 453 ) 454 if not response.ok: 455 raise Exception(response.text) 456 response = response.json() 457 assert response["status"] == "ok" 458 459 def get_source(self, id: str) -> Source: 460 assert type(id) is str 461 response = self._session.get( 462 f"{self._endpoint}/v2/source/{id}", 463 headers={"Authorization": f"Bearer {self._api_key}"}, 464 ) 465 if not response.ok: 466 raise Exception(response.text) 467 response = response.json() 468 return Source(response["id"], response) 469 470 def list_sources(self) -> list[str]: 471 response = self._session.get( 472 f"{self._endpoint}/v2/source", 473 headers={"Authorization": f"Bearer {self._api_key}"}, 474 ) 475 if not response.ok: 476 raise Exception(response.text) 477 response = response.json() 478 return response 479 480 def delete_source(self, id: str): 481 assert type(id) is str 482 response = self._session.delete( 483 f"{self._endpoint}/v2/source/{id}", 484 headers={"Authorization": f"Bearer {self._api_key}"}, 485 ) 486 if not response.ok: 487 raise Exception(response.text) 488 response = response.json() 489 assert response["status"] == "ok" 490 491 def search_source( 492 self, name, stream_idx, storage_service, storage_config 493 ) -> list[str]: 494 assert type(name) is str 495 assert type(stream_idx) is int 496 assert type(storage_service) is str 497 assert type(storage_config) is dict 498 for k, v in storage_config.items(): 499 assert type(k) is str 500 assert type(v) is str 501 req = { 502 "name": name, 503 "stream_idx": stream_idx, 504 "storage_service": storage_service, 505 "storage_config": storage_config, 506 } 507 response = self._session.post( 508 f"{self._endpoint}/v2/source/search", 509 json=req, 510 headers={"Authorization": f"Bearer {self._api_key}"}, 511 ) 512 if not response.ok: 513 raise Exception(response.text) 514 response = response.json() 515 return response 516 517 def create_source( 518 self, name, stream_idx, storage_service, storage_config 519 ) -> Source: 520 assert type(name) is str 521 assert type(stream_idx) is int 522 assert type(storage_service) is str 523 assert type(storage_config) is dict 524 for k, v in storage_config.items(): 525 assert type(k) is str 526 assert type(v) is str 527 req = { 528 "name": name, 529 "stream_idx": stream_idx, 530 "storage_service": storage_service, 531 "storage_config": storage_config, 532 } 533 response = self._session.post( 534 f"{self._endpoint}/v2/source", 535 json=req, 536 headers={"Authorization": f"Bearer {self._api_key}"}, 537 ) 538 if not response.ok: 539 raise Exception(response.text) 540 response = response.json() 541 assert response["status"] == "ok" 542 id = response["id"] 543 return self.get_source(id) 544 545 def source(self, name, stream_idx, storage_service, storage_config) -> Source: 546 """Convenience function for accessing sources. 547 548 Tries to find a source with the given name, stream_idx, storage_service, and storage_config. 549 If no source is found, creates a new source with the given parameters. 550 """ 551 552 sources = self.search_source(name, stream_idx, storage_service, storage_config) 553 if len(sources) == 0: 554 return self.create_source(name, stream_idx, storage_service, storage_config) 555 return self.get_source(sources[0]) 556 557 def get_spec(self, id: str) -> Spec: 558 assert type(id) is str 559 response = self._session.get( 560 f"{self._endpoint}/v2/spec/{id}", 561 headers={"Authorization": f"Bearer {self._api_key}"}, 562 ) 563 if not response.ok: 564 raise Exception(response.text) 565 response = response.json() 566 return Spec(response["id"], response) 567 568 def list_specs(self) -> list[str]: 569 response = self._session.get( 570 f"{self._endpoint}/v2/spec", 571 headers={"Authorization": f"Bearer {self._api_key}"}, 572 ) 573 if not response.ok: 574 raise Exception(response.text) 575 response = response.json() 576 return response 577 578 def create_spec( 579 self, 580 width, 581 height, 582 pix_fmt, 583 vod_segment_length, 584 frame_rate, 585 ready_hook=None, 586 steer_hook=None, 587 ttl=None, 588 ) -> Spec: 589 assert type(width) is int 590 assert type(height) is int 591 assert type(pix_fmt) is str 592 assert type(vod_segment_length) is Fraction 593 assert type(frame_rate) is Fraction 594 assert type(ready_hook) is str or ready_hook is None 595 assert type(steer_hook) is str or steer_hook is None 596 assert ttl is None or type(ttl) is int 597 598 req = { 599 "width": width, 600 "height": height, 601 "pix_fmt": pix_fmt, 602 "vod_segment_length": [ 603 vod_segment_length.numerator, 604 vod_segment_length.denominator, 605 ], 606 "frame_rate": [frame_rate.numerator, frame_rate.denominator], 607 "ready_hook": ready_hook, 608 "steer_hook": steer_hook, 609 "ttl": ttl, 610 } 611 response = self._session.post( 612 f"{self._endpoint}/v2/spec", 613 json=req, 614 headers={"Authorization": f"Bearer {self._api_key}"}, 615 ) 616 if not response.ok: 617 raise Exception(response.text) 618 response = response.json() 619 assert response["status"] == "ok" 620 return self.get_spec(response["id"]) 621 622 def delete_spec(self, id: str): 623 assert type(id) is str 624 response = self._session.delete( 625 f"{self._endpoint}/v2/spec/{id}", 626 headers={"Authorization": f"Bearer {self._api_key}"}, 627 ) 628 if not response.ok: 629 raise Exception(response.text) 630 response = response.json() 631 assert response["status"] == "ok" 632 633 def export_spec( 634 self, id: str, path: str, encoder=None, encoder_opts=None, format=None 635 ): 636 assert type(id) is str 637 assert type(path) is str 638 req = { 639 "path": path, 640 "encoder": encoder, 641 "encoder_opts": encoder_opts, 642 "format": format, 643 } 644 response = self._session.post( 645 f"{self._endpoint}/v2/spec/{id}/export", 646 json=req, 647 headers={"Authorization": f"Bearer {self._api_key}"}, 648 ) 649 if not response.ok: 650 raise Exception(response.text) 651 response = response.json() 652 assert response["status"] == "ok" 653 654 def push_spec_part(self, spec_id, pos, frames, terminal): 655 if type(spec_id) is Spec: 656 spec_id = spec_id._id 657 assert type(spec_id) is str 658 assert type(pos) is int 659 assert type(frames) is list 660 assert type(terminal) is bool 661 662 req_frames = [] 663 for frame in frames: 664 assert type(frame) is tuple 665 assert len(frame) == 2 666 t = frame[0] 667 f = frame[1] 668 assert type(t) is Fraction 669 assert f is None or type(f) is SourceExpr or type(f) is FilterExpr 670 req_frames.append( 671 [ 672 [t.numerator, t.denominator], 673 f._to_json_spec() if f is not None else None, 674 ] 675 ) 676 677 req = { 678 "pos": pos, 679 "frames": req_frames, 680 "terminal": terminal, 681 } 682 response = self._session.post( 683 f"{self._endpoint}/v2/spec/{spec_id}/part", 684 json=req, 685 headers={"Authorization": f"Bearer {self._api_key}"}, 686 ) 687 if not response.ok: 688 raise Exception(response.text) 689 response = response.json() 690 assert response["status"] == "ok" 691 692 def push_spec_part_block( 693 self, spec_id: str, pos, blocks, terminal, compression="gzip" 694 ): 695 if type(spec_id) is Spec: 696 spec_id = spec_id._id 697 assert type(spec_id) is str 698 assert type(pos) is int 699 assert type(blocks) is list 700 assert type(terminal) is bool 701 assert compression is None or compression == "gzip" 702 703 req_blocks = [] 704 for block in blocks: 705 assert type(block) is _FrameExpressionBlock 706 block_body = block.as_dict() 707 block_frames = len(block_body["frame_exprs"]) 708 block_body = json.dumps(block_body).encode("utf-8") 709 if compression == "gzip": 710 block_body = gzip.compress(block_body, 1) 711 block_body = base64.b64encode(block_body).decode("utf-8") 712 req_blocks.append( 713 { 714 "frames": block_frames, 715 "compression": compression, 716 "body": block_body, 717 } 718 ) 719 720 req = { 721 "pos": pos, 722 "terminal": terminal, 723 "blocks": req_blocks, 724 } 725 response = self._session.post( 726 f"{self._endpoint}/v2/spec/{spec_id}/part_block", 727 json=req, 728 headers={"Authorization": f"Bearer {self._api_key}"}, 729 ) 730 if not response.ok: 731 raise Exception(response.text) 732 response = response.json() 733 assert response["status"] == "ok" 734 735 def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"): 736 assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr 737 assert compression is None or compression in ["gzip"] 738 feb = _FrameExpressionBlock() 739 feb.insert_frame(frame_expr) 740 feb_body = feb.as_dict() 741 742 feb_body = json.dumps(feb_body).encode("utf-8") 743 if compression == "gzip": 744 feb_body = gzip.compress(feb_body, 1) 745 feb_body = base64.b64encode(feb_body).decode("utf-8") 746 req = { 747 "width": width, 748 "height": height, 749 "pix_fmt": pix_fmt, 750 "compression": compression, 751 "block": { 752 "frames": 1, 753 "compression": compression, 754 "body": feb_body, 755 }, 756 } 757 response = self._session.post( 758 f"{self._endpoint}/v2/frame", 759 json=req, 760 headers={"Authorization": f"Bearer {self._api_key}"}, 761 ) 762 if not response.ok: 763 raise Exception(response.text) 764 response_body = response.content 765 assert type(response_body) is bytes 766 if compression == "gzip": 767 response_body = gzip.decompress(response_body) 768 return response_body
Server(endpoint: str, api_key: str)
440 def __init__(self, endpoint: str, api_key: str): 441 if not endpoint.startswith("http://") and not endpoint.startswith("https://"): 442 raise Exception("Endpoint must start with http:// or https://") 443 if endpoint.endswith("/"): 444 raise Exception("Endpoint must not end with /") 445 self._endpoint = endpoint 446 447 self._api_key = api_key 448 self._session = requests.Session() 449 self._session.headers.update({"Authorization": f"Bearer {self._api_key}"}) 450 response = self._session.get( 451 f"{self._endpoint}/v2/auth", 452 headers={"Authorization": f"Bearer {self._api_key}"}, 453 ) 454 if not response.ok: 455 raise Exception(response.text) 456 response = response.json() 457 assert response["status"] == "ok"
459 def get_source(self, id: str) -> Source: 460 assert type(id) is str 461 response = self._session.get( 462 f"{self._endpoint}/v2/source/{id}", 463 headers={"Authorization": f"Bearer {self._api_key}"}, 464 ) 465 if not response.ok: 466 raise Exception(response.text) 467 response = response.json() 468 return Source(response["id"], response)
def
delete_source(self, id: str):
480 def delete_source(self, id: str): 481 assert type(id) is str 482 response = self._session.delete( 483 f"{self._endpoint}/v2/source/{id}", 484 headers={"Authorization": f"Bearer {self._api_key}"}, 485 ) 486 if not response.ok: 487 raise Exception(response.text) 488 response = response.json() 489 assert response["status"] == "ok"
def
search_source(self, name, stream_idx, storage_service, storage_config) -> list[str]:
491 def search_source( 492 self, name, stream_idx, storage_service, storage_config 493 ) -> list[str]: 494 assert type(name) is str 495 assert type(stream_idx) is int 496 assert type(storage_service) is str 497 assert type(storage_config) is dict 498 for k, v in storage_config.items(): 499 assert type(k) is str 500 assert type(v) is str 501 req = { 502 "name": name, 503 "stream_idx": stream_idx, 504 "storage_service": storage_service, 505 "storage_config": storage_config, 506 } 507 response = self._session.post( 508 f"{self._endpoint}/v2/source/search", 509 json=req, 510 headers={"Authorization": f"Bearer {self._api_key}"}, 511 ) 512 if not response.ok: 513 raise Exception(response.text) 514 response = response.json() 515 return response
517 def create_source( 518 self, name, stream_idx, storage_service, storage_config 519 ) -> Source: 520 assert type(name) is str 521 assert type(stream_idx) is int 522 assert type(storage_service) is str 523 assert type(storage_config) is dict 524 for k, v in storage_config.items(): 525 assert type(k) is str 526 assert type(v) is str 527 req = { 528 "name": name, 529 "stream_idx": stream_idx, 530 "storage_service": storage_service, 531 "storage_config": storage_config, 532 } 533 response = self._session.post( 534 f"{self._endpoint}/v2/source", 535 json=req, 536 headers={"Authorization": f"Bearer {self._api_key}"}, 537 ) 538 if not response.ok: 539 raise Exception(response.text) 540 response = response.json() 541 assert response["status"] == "ok" 542 id = response["id"] 543 return self.get_source(id)
545 def source(self, name, stream_idx, storage_service, storage_config) -> Source: 546 """Convenience function for accessing sources. 547 548 Tries to find a source with the given name, stream_idx, storage_service, and storage_config. 549 If no source is found, creates a new source with the given parameters. 550 """ 551 552 sources = self.search_source(name, stream_idx, storage_service, storage_config) 553 if len(sources) == 0: 554 return self.create_source(name, stream_idx, storage_service, storage_config) 555 return self.get_source(sources[0])
Convenience function for accessing sources.
Tries to find a source with the given name, stream_idx, storage_service, and storage_config. If no source is found, creates a new source with the given parameters.
557 def get_spec(self, id: str) -> Spec: 558 assert type(id) is str 559 response = self._session.get( 560 f"{self._endpoint}/v2/spec/{id}", 561 headers={"Authorization": f"Bearer {self._api_key}"}, 562 ) 563 if not response.ok: 564 raise Exception(response.text) 565 response = response.json() 566 return Spec(response["id"], response)
def
create_spec( self, width, height, pix_fmt, vod_segment_length, frame_rate, ready_hook=None, steer_hook=None, ttl=None) -> Spec:
578 def create_spec( 579 self, 580 width, 581 height, 582 pix_fmt, 583 vod_segment_length, 584 frame_rate, 585 ready_hook=None, 586 steer_hook=None, 587 ttl=None, 588 ) -> Spec: 589 assert type(width) is int 590 assert type(height) is int 591 assert type(pix_fmt) is str 592 assert type(vod_segment_length) is Fraction 593 assert type(frame_rate) is Fraction 594 assert type(ready_hook) is str or ready_hook is None 595 assert type(steer_hook) is str or steer_hook is None 596 assert ttl is None or type(ttl) is int 597 598 req = { 599 "width": width, 600 "height": height, 601 "pix_fmt": pix_fmt, 602 "vod_segment_length": [ 603 vod_segment_length.numerator, 604 vod_segment_length.denominator, 605 ], 606 "frame_rate": [frame_rate.numerator, frame_rate.denominator], 607 "ready_hook": ready_hook, 608 "steer_hook": steer_hook, 609 "ttl": ttl, 610 } 611 response = self._session.post( 612 f"{self._endpoint}/v2/spec", 613 json=req, 614 headers={"Authorization": f"Bearer {self._api_key}"}, 615 ) 616 if not response.ok: 617 raise Exception(response.text) 618 response = response.json() 619 assert response["status"] == "ok" 620 return self.get_spec(response["id"])
def
delete_spec(self, id: str):
622 def delete_spec(self, id: str): 623 assert type(id) is str 624 response = self._session.delete( 625 f"{self._endpoint}/v2/spec/{id}", 626 headers={"Authorization": f"Bearer {self._api_key}"}, 627 ) 628 if not response.ok: 629 raise Exception(response.text) 630 response = response.json() 631 assert response["status"] == "ok"
def
export_spec( self, id: str, path: str, encoder=None, encoder_opts=None, format=None):
633 def export_spec( 634 self, id: str, path: str, encoder=None, encoder_opts=None, format=None 635 ): 636 assert type(id) is str 637 assert type(path) is str 638 req = { 639 "path": path, 640 "encoder": encoder, 641 "encoder_opts": encoder_opts, 642 "format": format, 643 } 644 response = self._session.post( 645 f"{self._endpoint}/v2/spec/{id}/export", 646 json=req, 647 headers={"Authorization": f"Bearer {self._api_key}"}, 648 ) 649 if not response.ok: 650 raise Exception(response.text) 651 response = response.json() 652 assert response["status"] == "ok"
def
push_spec_part(self, spec_id, pos, frames, terminal):
654 def push_spec_part(self, spec_id, pos, frames, terminal): 655 if type(spec_id) is Spec: 656 spec_id = spec_id._id 657 assert type(spec_id) is str 658 assert type(pos) is int 659 assert type(frames) is list 660 assert type(terminal) is bool 661 662 req_frames = [] 663 for frame in frames: 664 assert type(frame) is tuple 665 assert len(frame) == 2 666 t = frame[0] 667 f = frame[1] 668 assert type(t) is Fraction 669 assert f is None or type(f) is SourceExpr or type(f) is FilterExpr 670 req_frames.append( 671 [ 672 [t.numerator, t.denominator], 673 f._to_json_spec() if f is not None else None, 674 ] 675 ) 676 677 req = { 678 "pos": pos, 679 "frames": req_frames, 680 "terminal": terminal, 681 } 682 response = self._session.post( 683 f"{self._endpoint}/v2/spec/{spec_id}/part", 684 json=req, 685 headers={"Authorization": f"Bearer {self._api_key}"}, 686 ) 687 if not response.ok: 688 raise Exception(response.text) 689 response = response.json() 690 assert response["status"] == "ok"
def
push_spec_part_block(self, spec_id: str, pos, blocks, terminal, compression='gzip'):
692 def push_spec_part_block( 693 self, spec_id: str, pos, blocks, terminal, compression="gzip" 694 ): 695 if type(spec_id) is Spec: 696 spec_id = spec_id._id 697 assert type(spec_id) is str 698 assert type(pos) is int 699 assert type(blocks) is list 700 assert type(terminal) is bool 701 assert compression is None or compression == "gzip" 702 703 req_blocks = [] 704 for block in blocks: 705 assert type(block) is _FrameExpressionBlock 706 block_body = block.as_dict() 707 block_frames = len(block_body["frame_exprs"]) 708 block_body = json.dumps(block_body).encode("utf-8") 709 if compression == "gzip": 710 block_body = gzip.compress(block_body, 1) 711 block_body = base64.b64encode(block_body).decode("utf-8") 712 req_blocks.append( 713 { 714 "frames": block_frames, 715 "compression": compression, 716 "body": block_body, 717 } 718 ) 719 720 req = { 721 "pos": pos, 722 "terminal": terminal, 723 "blocks": req_blocks, 724 } 725 response = self._session.post( 726 f"{self._endpoint}/v2/spec/{spec_id}/part_block", 727 json=req, 728 headers={"Authorization": f"Bearer {self._api_key}"}, 729 ) 730 if not response.ok: 731 raise Exception(response.text) 732 response = response.json() 733 assert response["status"] == "ok"
def
frame(self, width, height, pix_fmt, frame_expr, compression='gzip'):
735 def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"): 736 assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr 737 assert compression is None or compression in ["gzip"] 738 feb = _FrameExpressionBlock() 739 feb.insert_frame(frame_expr) 740 feb_body = feb.as_dict() 741 742 feb_body = json.dumps(feb_body).encode("utf-8") 743 if compression == "gzip": 744 feb_body = gzip.compress(feb_body, 1) 745 feb_body = base64.b64encode(feb_body).decode("utf-8") 746 req = { 747 "width": width, 748 "height": height, 749 "pix_fmt": pix_fmt, 750 "compression": compression, 751 "block": { 752 "frames": 1, 753 "compression": compression, 754 "body": feb_body, 755 }, 756 } 757 response = self._session.post( 758 f"{self._endpoint}/v2/frame", 759 json=req, 760 headers={"Authorization": f"Bearer {self._api_key}"}, 761 ) 762 if not response.ok: 763 raise Exception(response.text) 764 response_body = response.content 765 assert type(response_body) is bytes 766 if compression == "gzip": 767 response_body = gzip.decompress(response_body) 768 return response_body
class
SourceExpr:
771class SourceExpr: 772 def __init__(self, source, idx, is_iloc): 773 self._source = source 774 self._idx = idx 775 self._is_iloc = is_iloc 776 777 def __repr__(self): 778 if self._is_iloc: 779 return f"{self._source._name}.iloc[{self._idx}]" 780 else: 781 return f"{self._source._name}[{self._idx}]" 782 783 def _to_json_spec(self): 784 if self._is_iloc: 785 return { 786 "Source": { 787 "video": self._source._name, 788 "index": {"ILoc": int(self._idx)}, 789 } 790 } 791 else: 792 return { 793 "Source": { 794 "video": self._source._name, 795 "index": {"T": [self._idx.numerator, self._idx.denominator]}, 796 } 797 } 798 799 def _sources(self): 800 return set([self._source]) 801 802 def _filters(self): 803 return {}
class
Filter:
848class Filter: 849 """A video filter.""" 850 851 def __init__(self, func: str): 852 self._func = func 853 854 def __call__(self, *args, **kwargs): 855 return FilterExpr(self, args, kwargs)
A video filter.
class
FilterExpr:
858class FilterExpr: 859 def __init__(self, filter: Filter, args, kwargs): 860 self._filter = filter 861 self._args = args 862 self._kwargs = kwargs 863 864 def __repr__(self): 865 args = [] 866 for arg in self._args: 867 val = f'"{arg}"' if type(arg) is str else str(arg) 868 args.append(str(val)) 869 for k, v in self._kwargs.items(): 870 val = f'"{v}"' if type(v) is str else str(v) 871 args.append(f"{k}={val}") 872 return f"{self._filter._name}({', '.join(args)})" 873 874 def _to_json_spec(self): 875 args = [] 876 for arg in self._args: 877 args.append(_json_arg(arg)) 878 kwargs = {} 879 for k, v in self._kwargs.items(): 880 kwargs[k] = _json_arg(v) 881 return {"Filter": {"name": self._filter._name, "args": args, "kwargs": kwargs}} 882 883 def _sources(self): 884 s = set() 885 for arg in self._args: 886 if type(arg) is FilterExpr or type(arg) is SourceExpr: 887 s = s.union(arg._sources()) 888 for arg in self._kwargs.values(): 889 if type(arg) is FilterExpr or type(arg) is SourceExpr: 890 s = s.union(arg._sources()) 891 return s 892 893 def _filters(self): 894 f = {self._filter._name: self._filter} 895 for arg in self._args: 896 if type(arg) is FilterExpr: 897 f = {**f, **arg._filters()} 898 for arg in self._kwargs.values(): 899 if type(arg) is FilterExpr: 900 f = {**f, **arg._filters()} 901 return f