vidformer
vidformer-py is a Python 🐍 interface for vidformer.
Quick links:
1""" 2vidformer-py is a Python 🐍 interface for [vidformer](https://github.com/ixlab/vidformer). 3 4**Quick links:** 5* [📦 PyPI](https://pypi.org/project/vidformer/) 6* [📘 Documentation - vidformer-py](https://ixlab.github.io/vidformer/vidformer-py/pdoc/) 7* [📘 Documentation - vidformer.cv2](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/cv2.html) 8* [📘 Documentation - vidformer.supervision](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/supervision.html) 9* [🧑💻 Source Code](https://github.com/ixlab/vidformer/tree/main/vidformer-py/) 10""" 11 12__version__ = "1.2.2" 13 14 15import base64 16import gzip 17import json 18import struct 19import time 20from fractions import Fraction 21from urllib.parse import urlparse 22 23import requests 24 25_in_notebook = False 26try: 27 from IPython import get_ipython 28 29 if "IPKernelApp" in get_ipython().config: 30 _in_notebook = True 31except Exception: 32 pass 33 34 35def _wait_for_url(url, max_attempts=150, delay=0.1): 36 for attempt in range(max_attempts): 37 try: 38 response = requests.get(url) 39 if response.status_code == 200: 40 return response.text.strip() 41 else: 42 time.sleep(delay) 43 except requests.exceptions.RequestException: 44 time.sleep(delay) 45 return None 46 47 48def _play(namespace, hls_video_url, hls_js_url, method="display", status_url=None): 49 # The namespace is so multiple videos in one tab don't conflict 50 51 if method == "display": 52 from IPython.display import display 53 54 display( 55 _play( 56 namespace, 57 hls_video_url, 58 hls_js_url, 59 method="html", 60 status_url=status_url, 61 ) 62 ) 63 return 64 if method == "html": 65 from IPython.display import HTML 66 67 if not status_url: 68 html_code = f""" 69<!DOCTYPE html> 70<html> 71<head> 72 <title>HLS Video Player</title> 73 <!-- Include hls.js library --> 74 <script src="{hls_js_url}"></script> 75</head> 76<body> 77 <video id="video-{namespace}" controls width="640" height="360" autoplay></video> 78 <script> 79 var video = document.getElementById('video-{namespace}'); 80 var videoSrc = '{hls_video_url}'; 81 82 if (Hls.isSupported()) {{ 83 var hls = new Hls(); 84 hls.loadSource(videoSrc); 85 hls.attachMedia(video); 86 hls.on(Hls.Events.MANIFEST_PARSED, function() {{ 87 video.play(); 88 }}); 89 }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{ 90 video.src = videoSrc; 91 video.addEventListener('loadedmetadata', function() {{ 92 video.play(); 93 }}); 94 }} else {{ 95 console.error('This browser does not appear to support HLS.'); 96 }} 97 </script> 98</body> 99</html> 100""" 101 return HTML(data=html_code) 102 else: 103 html_code = f""" 104<!DOCTYPE html> 105<html> 106<head> 107 <title>HLS Video Player</title> 108 <script src="{hls_js_url}"></script> 109</head> 110<body> 111 <div id="container-{namespace}"></div> 112 <script> 113 var statusUrl = '{status_url}'; 114 var videoSrc = '{hls_video_url}'; 115 var videoNamespace = '{namespace}'; 116 117 function showWaiting() {{ 118 document.getElementById('container-{namespace}').textContent = 'Waiting...'; 119 pollStatus(); 120 }} 121 122 function pollStatus() {{ 123 setTimeout(function() {{ 124 fetch(statusUrl) 125 .then(r => r.json()) 126 .then(res => {{ 127 if (res.ready) {{ 128 document.getElementById('container-{namespace}').textContent = ''; 129 attachHls(); 130 }} else {{ 131 pollStatus(); 132 }} 133 }}) 134 .catch(e => {{ 135 console.error(e); 136 pollStatus(); 137 }}); 138 }}, 250); 139 }} 140 141 function attachHls() {{ 142 var container = document.getElementById('container-{namespace}'); 143 container.textContent = ''; 144 var video = document.createElement('video'); 145 video.id = 'video-' + videoNamespace; 146 video.controls = true; 147 video.width = 640; 148 video.height = 360; 149 container.appendChild(video); 150 if (Hls.isSupported()) {{ 151 var hls = new Hls(); 152 hls.loadSource(videoSrc); 153 hls.attachMedia(video); 154 hls.on(Hls.Events.MANIFEST_PARSED, function() {{ 155 video.play(); 156 }}); 157 }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{ 158 video.src = videoSrc; 159 video.addEventListener('loadedmetadata', function() {{ 160 video.play(); 161 }}); 162 }} 163 }} 164 165 fetch(statusUrl) 166 .then(r => r.json()) 167 .then(res => {{ 168 if (res.ready) {{ 169 attachHls(); 170 }} else {{ 171 showWaiting(); 172 }} 173 }}) 174 .catch(e => {{ 175 console.error(e); 176 showWaiting(); 177 }}); 178 </script> 179</body> 180</html> 181""" 182 return HTML(data=html_code) 183 elif method == "link": 184 return hls_video_url 185 else: 186 raise ValueError("Invalid method") 187 188 189def _feb_expr_coded_as_scalar(expr) -> bool: 190 if type(expr) is tuple: 191 expr = list(expr) 192 if type(expr) is FilterExpr: 193 return False 194 if type(expr) is list: 195 if len(expr) > 3: 196 return False 197 else: 198 return all([type(x) is int and x >= -(2**15) and x < 2**15 for x in expr]) 199 else: 200 assert type(expr) in [int, float, str, bytes, SourceExpr, bool, list] 201 return True 202 203 204class _FrameExpressionBlock: 205 def __init__(self): 206 self._functions = [] 207 self._literals = [] 208 self._sources = [] 209 self._kwarg_keys = [] 210 self._source_fracs = [] 211 self._exprs = [] 212 self._frame_exprs = [] 213 214 def __len__(self): 215 return len(self._frame_exprs) 216 217 def insert_expr(self, expr): 218 if type(expr) is SourceExpr or type(expr) is FilterExpr: 219 return self.insert_frame_expr(expr) 220 else: 221 return self.insert_data_expr(expr) 222 223 def insert_data_expr(self, data): 224 if type(data) is tuple: 225 data = list(data) 226 227 if type(data) is bool: 228 self._exprs.append(0x01000000_00000000 | int(data)) 229 return len(self._exprs) - 1 230 elif type(data) is int: 231 if data >= -(2**31) and data < 2**31: 232 self._exprs.append(data & 0xFFFFFFFF) 233 else: 234 self._literals.append(_json_arg(data, skip_data_anot=True)) 235 self._exprs.append(0x40000000_00000000 | len(self._literals) - 1) 236 return len(self._exprs) - 1 237 elif type(data) is float: 238 self._exprs.append( 239 0x02000000_00000000 | int.from_bytes(struct.pack("f", data)[::-1]) 240 ) 241 elif type(data) is str: 242 self._literals.append(_json_arg(data, skip_data_anot=True)) 243 self._exprs.append(0x40000000_00000000 | len(self._literals) - 1) 244 elif type(data) is bytes: 245 self._literals.append(_json_arg(data, skip_data_anot=True)) 246 self._exprs.append(0x40000000_00000000 | len(self._literals) - 1) 247 elif type(data) is list: 248 if len(data) == 0: 249 self._exprs.append(0x03000000_00000000) 250 return len(self._exprs) - 1 251 if ( 252 len(data) == 1 253 and type(data[0]) is int 254 and data[0] >= -(2**15) 255 and data[0] < 2**15 256 ): 257 self._exprs.append(0x04000000_00000000 | (data[0] & 0xFFFF)) 258 return len(self._exprs) - 1 259 if ( 260 len(data) == 2 261 and type(data[0]) is int 262 and data[0] >= -(2**15) 263 and data[0] < 2**15 264 and type(data[1]) is int 265 and data[1] >= -(2**15) 266 and data[1] < 2**15 267 ): 268 self._exprs.append( 269 0x05000000_00000000 270 | ((data[0] & 0xFFFF) << 16) 271 | (data[1] & 0xFFFF) 272 ) 273 return len(self._exprs) - 1 274 if ( 275 len(data) == 3 276 and type(data[0]) is int 277 and data[0] >= -(2**15) 278 and data[0] < 2**15 279 and type(data[1]) is int 280 and data[1] >= -(2**15) 281 and data[1] < 2**15 282 and type(data[2]) is int 283 and data[2] >= -(2**15) 284 and data[2] < 2**15 285 ): 286 self._exprs.append( 287 0x06000000_00000000 288 | ((data[0] & 0xFFFF) << 32) 289 | ((data[1] & 0xFFFF) << 16) 290 | (data[2] & 0xFFFF) 291 ) 292 return len(self._exprs) - 1 293 member_idxs = [] 294 for member in data: 295 if _feb_expr_coded_as_scalar(member): 296 member_idxs.append(None) 297 else: 298 member_idxs.append(self.insert_data_expr(member)) 299 300 out = len(self._exprs) 301 self._exprs.append(0x42000000_00000000 | len(data)) 302 303 for i in range(len(data)): 304 if member_idxs[i] is None: 305 self.insert_data_expr(data[i]) 306 else: 307 self._exprs.append(0x45000000_00000000 | member_idxs[i]) 308 309 return out 310 else: 311 raise Exception("Invalid data type") 312 313 def insert_frame_expr(self, frame): 314 if type(frame) is SourceExpr: 315 source = frame._source._name 316 if source in self._sources: 317 source_idx = self._sources.index(source) 318 else: 319 source_idx = len(self._sources) 320 self._sources.append(source) 321 if frame._is_iloc: 322 self._exprs.append( 323 0x43000000_00000000 | (source_idx << 32) | frame._idx 324 ) 325 else: 326 idx = len(self._source_fracs) // 2 327 self._source_fracs.append(frame._idx.numerator) 328 self._source_fracs.append(frame._idx.denominator) 329 self._exprs.append(0x44000000_00000000 | (source_idx << 32) | idx) 330 return len(self._exprs) - 1 331 elif type(frame) is FilterExpr: 332 func = frame._filter._func 333 if func in self._functions: 334 func_idx = self._functions.index(func) 335 else: 336 func_idx = len(self._functions) 337 self._functions.append(func) 338 len_args = len(frame._args) 339 len_kwargs = len(frame._kwargs) 340 341 arg_idxs = [] 342 for arg in frame._args: 343 if _feb_expr_coded_as_scalar(arg): 344 arg_idxs.append(None) 345 else: 346 arg_idxs.append(self.insert_expr(arg)) 347 kwarg_idxs = {} 348 for k, v in frame._kwargs.items(): 349 if _feb_expr_coded_as_scalar(v): 350 kwarg_idxs[k] = None 351 else: 352 kwarg_idxs[k] = self.insert_expr(v) 353 354 out_idx = len(self._exprs) 355 self._exprs.append( 356 0x41000000_00000000 | (len_args << 24) | (len_kwargs << 16) | func_idx 357 ) 358 for i in range(len_args): 359 if arg_idxs[i] is None: 360 # It's a scalar 361 self.insert_expr(frame._args[i]) 362 else: 363 # It's an expression pointer 364 self._exprs.append(0x45000000_00000000 | arg_idxs[i]) 365 for k, v in frame._kwargs.items(): 366 if k in self._kwarg_keys: 367 k_idx = self._kwarg_keys.index(k) 368 else: 369 k_idx = len(self._kwarg_keys) 370 self._kwarg_keys.append(k) 371 self._exprs.append(0x46000000_00000000 | k_idx) 372 if kwarg_idxs[k] is None: 373 # It's a scalar 374 self.insert_expr(v) 375 else: 376 # It's an expression pointer 377 self._exprs.append(0x45000000_00000000 | kwarg_idxs[k]) 378 return out_idx 379 else: 380 raise Exception("Invalid frame type") 381 382 def insert_frame(self, frame): 383 idx = self.insert_frame_expr(frame) 384 self._frame_exprs.append(idx) 385 386 def as_dict(self): 387 return { 388 "functions": self._functions, 389 "literals": self._literals, 390 "sources": self._sources, 391 "kwarg_keys": self._kwarg_keys, 392 "source_fracs": self._source_fracs, 393 "exprs": self._exprs, 394 "frame_exprs": self._frame_exprs, 395 } 396 397 398class Source: 399 def __init__(self, id: str, src): 400 self._name = id 401 self._fmt = { 402 "width": src["width"], 403 "height": src["height"], 404 "pix_fmt": src["pix_fmt"], 405 } 406 self._ts = [Fraction(x[0], x[1]) for x in src["ts"]] 407 self.iloc = _SourceILoc(self) 408 409 def id(self) -> str: 410 return self._name 411 412 def fmt(self): 413 return {**self._fmt} 414 415 def ts(self) -> list[Fraction]: 416 return self._ts.copy() 417 418 def __len__(self): 419 return len(self._ts) 420 421 def __getitem__(self, idx): 422 if type(idx) is not Fraction: 423 raise Exception("Source index must be a Fraction") 424 return SourceExpr(self, idx, False) 425 426 def __repr__(self): 427 return f"Source({self._name})" 428 429 430class Spec: 431 def __init__(self, id: str, src): 432 self._id = id 433 self._fmt = { 434 "width": src["width"], 435 "height": src["height"], 436 "pix_fmt": src["pix_fmt"], 437 } 438 self._vod_endpoint = src["vod_endpoint"] 439 parsed_url = urlparse(self._vod_endpoint) 440 self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js" 441 442 def id(self) -> str: 443 return self._id 444 445 def play(self, method): 446 url = f"{self._vod_endpoint}playlist.m3u8" 447 status_url = f"{self._vod_endpoint}status" 448 hls_js_url = self._hls_js_url 449 return _play(self._id, url, hls_js_url, method=method, status_url=status_url) 450 451 452class Server: 453 def __init__( 454 self, endpoint: str, api_key: str, vod_only=False, cv2_writer_init_callback=None 455 ): 456 if not endpoint.startswith("http://") and not endpoint.startswith("https://"): 457 raise Exception("Endpoint must start with http:// or https://") 458 if endpoint.endswith("/"): 459 raise Exception("Endpoint must not end with /") 460 self._endpoint = endpoint 461 462 self._api_key = api_key 463 self._session = requests.Session() 464 self._session.headers.update({"Authorization": f"Bearer {self._api_key}"}) 465 response = self._session.get( 466 f"{self._endpoint}/v2/auth", 467 headers={"Authorization": f"Bearer {self._api_key}"}, 468 ) 469 if not response.ok: 470 raise Exception(response.text) 471 response = response.json() 472 assert response["status"] == "ok" 473 self._vod_only = vod_only 474 self._cv2_writer_init_callback = cv2_writer_init_callback 475 476 def is_vod_only(self) -> bool: 477 return self._vod_only 478 479 def cv2_writer_init_callback(self): 480 return self._cv2_writer_init_callback 481 482 def get_source(self, id: str) -> Source: 483 assert type(id) is str 484 response = self._session.get( 485 f"{self._endpoint}/v2/source/{id}", 486 headers={"Authorization": f"Bearer {self._api_key}"}, 487 ) 488 if not response.ok: 489 raise Exception(response.text) 490 response = response.json() 491 return Source(response["id"], response) 492 493 def list_sources(self) -> list[str]: 494 response = self._session.get( 495 f"{self._endpoint}/v2/source", 496 headers={"Authorization": f"Bearer {self._api_key}"}, 497 ) 498 if not response.ok: 499 raise Exception(response.text) 500 response = response.json() 501 return response 502 503 def delete_source(self, id: str): 504 assert type(id) is str 505 response = self._session.delete( 506 f"{self._endpoint}/v2/source/{id}", 507 headers={"Authorization": f"Bearer {self._api_key}"}, 508 ) 509 if not response.ok: 510 raise Exception(response.text) 511 response = response.json() 512 assert response["status"] == "ok" 513 514 def search_source( 515 self, name, stream_idx, storage_service, storage_config 516 ) -> list[str]: 517 assert type(name) is str 518 assert type(stream_idx) is int 519 assert type(storage_service) is str 520 assert type(storage_config) is dict 521 for k, v in storage_config.items(): 522 assert type(k) is str 523 assert type(v) is str 524 req = { 525 "name": name, 526 "stream_idx": stream_idx, 527 "storage_service": storage_service, 528 "storage_config": storage_config, 529 } 530 response = self._session.post( 531 f"{self._endpoint}/v2/source/search", 532 json=req, 533 headers={"Authorization": f"Bearer {self._api_key}"}, 534 ) 535 if not response.ok: 536 raise Exception(response.text) 537 response = response.json() 538 return response 539 540 def create_source( 541 self, name, stream_idx, storage_service, storage_config 542 ) -> Source: 543 assert type(name) is str 544 assert type(stream_idx) is int 545 assert type(storage_service) is str 546 assert type(storage_config) is dict 547 for k, v in storage_config.items(): 548 assert type(k) is str 549 assert type(v) is str 550 req = { 551 "name": name, 552 "stream_idx": stream_idx, 553 "storage_service": storage_service, 554 "storage_config": storage_config, 555 } 556 response = self._session.post( 557 f"{self._endpoint}/v2/source", 558 json=req, 559 headers={"Authorization": f"Bearer {self._api_key}"}, 560 ) 561 if not response.ok: 562 raise Exception(response.text) 563 response = response.json() 564 assert response["status"] == "ok" 565 id = response["id"] 566 return self.get_source(id) 567 568 def source(self, name, stream_idx, storage_service, storage_config) -> Source: 569 """Convenience function for accessing sources. 570 571 Tries to find a source with the given name, stream_idx, storage_service, and storage_config. 572 If no source is found, creates a new source with the given parameters. 573 """ 574 575 sources = self.search_source(name, stream_idx, storage_service, storage_config) 576 if len(sources) == 0: 577 return self.create_source(name, stream_idx, storage_service, storage_config) 578 return self.get_source(sources[0]) 579 580 def get_spec(self, id: str) -> Spec: 581 assert type(id) is str 582 response = self._session.get( 583 f"{self._endpoint}/v2/spec/{id}", 584 headers={"Authorization": f"Bearer {self._api_key}"}, 585 ) 586 if not response.ok: 587 raise Exception(response.text) 588 response = response.json() 589 return Spec(response["id"], response) 590 591 def list_specs(self) -> list[str]: 592 response = self._session.get( 593 f"{self._endpoint}/v2/spec", 594 headers={"Authorization": f"Bearer {self._api_key}"}, 595 ) 596 if not response.ok: 597 raise Exception(response.text) 598 response = response.json() 599 return response 600 601 def create_spec( 602 self, 603 width, 604 height, 605 pix_fmt, 606 vod_segment_length, 607 frame_rate, 608 ready_hook=None, 609 steer_hook=None, 610 ttl=None, 611 ) -> Spec: 612 assert type(width) is int 613 assert type(height) is int 614 assert type(pix_fmt) is str 615 assert type(vod_segment_length) is Fraction 616 assert type(frame_rate) is Fraction 617 assert type(ready_hook) is str or ready_hook is None 618 assert type(steer_hook) is str or steer_hook is None 619 assert ttl is None or type(ttl) is int 620 621 req = { 622 "width": width, 623 "height": height, 624 "pix_fmt": pix_fmt, 625 "vod_segment_length": [ 626 vod_segment_length.numerator, 627 vod_segment_length.denominator, 628 ], 629 "frame_rate": [frame_rate.numerator, frame_rate.denominator], 630 "ready_hook": ready_hook, 631 "steer_hook": steer_hook, 632 "ttl": ttl, 633 } 634 response = self._session.post( 635 f"{self._endpoint}/v2/spec", 636 json=req, 637 headers={"Authorization": f"Bearer {self._api_key}"}, 638 ) 639 if not response.ok: 640 raise Exception(response.text) 641 response = response.json() 642 assert response["status"] == "ok" 643 return self.get_spec(response["id"]) 644 645 def delete_spec(self, id: str): 646 assert type(id) is str 647 response = self._session.delete( 648 f"{self._endpoint}/v2/spec/{id}", 649 headers={"Authorization": f"Bearer {self._api_key}"}, 650 ) 651 if not response.ok: 652 raise Exception(response.text) 653 response = response.json() 654 assert response["status"] == "ok" 655 656 def export_spec( 657 self, id: str, path: str, encoder=None, encoder_opts=None, format=None 658 ): 659 assert type(id) is str 660 assert type(path) is str 661 req = { 662 "path": path, 663 "encoder": encoder, 664 "encoder_opts": encoder_opts, 665 "format": format, 666 } 667 response = self._session.post( 668 f"{self._endpoint}/v2/spec/{id}/export", 669 json=req, 670 headers={"Authorization": f"Bearer {self._api_key}"}, 671 ) 672 if not response.ok: 673 raise Exception(response.text) 674 response = response.json() 675 assert response["status"] == "ok" 676 677 def push_spec_part(self, spec_id, pos, frames, terminal): 678 if type(spec_id) is Spec: 679 spec_id = spec_id._id 680 assert type(spec_id) is str 681 assert type(pos) is int 682 assert type(frames) is list 683 assert type(terminal) is bool 684 685 req_frames = [] 686 for frame in frames: 687 assert type(frame) is tuple 688 assert len(frame) == 2 689 t = frame[0] 690 f = frame[1] 691 assert type(t) is Fraction 692 assert f is None or type(f) is SourceExpr or type(f) is FilterExpr 693 req_frames.append( 694 [ 695 [t.numerator, t.denominator], 696 f._to_json_spec() if f is not None else None, 697 ] 698 ) 699 700 req = { 701 "pos": pos, 702 "frames": req_frames, 703 "terminal": terminal, 704 } 705 response = self._session.post( 706 f"{self._endpoint}/v2/spec/{spec_id}/part", 707 json=req, 708 headers={"Authorization": f"Bearer {self._api_key}"}, 709 ) 710 if not response.ok: 711 raise Exception(response.text) 712 response = response.json() 713 assert response["status"] == "ok" 714 715 def push_spec_part_block( 716 self, spec_id: str, pos, blocks, terminal, compression="gzip" 717 ): 718 if type(spec_id) is Spec: 719 spec_id = spec_id._id 720 assert type(spec_id) is str 721 assert type(pos) is int 722 assert type(blocks) is list 723 assert type(terminal) is bool 724 assert compression is None or compression == "gzip" 725 726 req_blocks = [] 727 for block in blocks: 728 assert type(block) is _FrameExpressionBlock 729 block_body = block.as_dict() 730 block_frames = len(block_body["frame_exprs"]) 731 block_body = json.dumps(block_body).encode("utf-8") 732 if compression == "gzip": 733 block_body = gzip.compress(block_body, 1) 734 block_body = base64.b64encode(block_body).decode("utf-8") 735 req_blocks.append( 736 { 737 "frames": block_frames, 738 "compression": compression, 739 "body": block_body, 740 } 741 ) 742 743 req = { 744 "pos": pos, 745 "terminal": terminal, 746 "blocks": req_blocks, 747 } 748 response = self._session.post( 749 f"{self._endpoint}/v2/spec/{spec_id}/part_block", 750 json=req, 751 headers={"Authorization": f"Bearer {self._api_key}"}, 752 ) 753 if not response.ok: 754 raise Exception(response.text) 755 response = response.json() 756 assert response["status"] == "ok" 757 758 def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"): 759 assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr 760 assert compression is None or compression in ["gzip"] 761 feb = _FrameExpressionBlock() 762 feb.insert_frame(frame_expr) 763 feb_body = feb.as_dict() 764 765 feb_body = json.dumps(feb_body).encode("utf-8") 766 if compression == "gzip": 767 feb_body = gzip.compress(feb_body, 1) 768 feb_body = base64.b64encode(feb_body).decode("utf-8") 769 req = { 770 "width": width, 771 "height": height, 772 "pix_fmt": pix_fmt, 773 "compression": compression, 774 "block": { 775 "frames": 1, 776 "compression": compression, 777 "body": feb_body, 778 }, 779 } 780 response = self._session.post( 781 f"{self._endpoint}/v2/frame", 782 json=req, 783 headers={"Authorization": f"Bearer {self._api_key}"}, 784 ) 785 if not response.ok: 786 raise Exception(response.text) 787 response_body = response.content 788 assert type(response_body) is bytes 789 if compression == "gzip": 790 response_body = gzip.decompress(response_body) 791 return response_body 792 793 794class SourceExpr: 795 def __init__(self, source, idx, is_iloc): 796 self._source = source 797 self._idx = idx 798 self._is_iloc = is_iloc 799 800 def __repr__(self): 801 if self._is_iloc: 802 return f"{self._source._name}.iloc[{self._idx}]" 803 else: 804 return f"{self._source._name}[{self._idx}]" 805 806 def _to_json_spec(self): 807 if self._is_iloc: 808 return { 809 "Source": { 810 "video": self._source._name, 811 "index": {"ILoc": int(self._idx)}, 812 } 813 } 814 else: 815 return { 816 "Source": { 817 "video": self._source._name, 818 "index": {"T": [self._idx.numerator, self._idx.denominator]}, 819 } 820 } 821 822 def _sources(self): 823 return set([self._source]) 824 825 def _filters(self): 826 return {} 827 828 829class _SourceILoc: 830 def __init__(self, source): 831 self._source = source 832 833 def __getitem__(self, idx): 834 if type(idx) is not int: 835 raise Exception(f"Source iloc index must be an integer, got a {type(idx)}") 836 return SourceExpr(self._source, idx, True) 837 838 839def _json_arg(arg, skip_data_anot=False): 840 if type(arg) is FilterExpr or type(arg) is SourceExpr: 841 return {"Frame": arg._to_json_spec()} 842 elif type(arg) is int: 843 if skip_data_anot: 844 return {"Int": arg} 845 return {"Data": {"Int": arg}} 846 elif type(arg) is str: 847 if skip_data_anot: 848 return {"String": arg} 849 return {"Data": {"String": arg}} 850 elif type(arg) is bytes: 851 arg = list(arg) 852 if skip_data_anot: 853 return {"Bytes": arg} 854 return {"Data": {"Bytes": arg}} 855 elif type(arg) is float: 856 if skip_data_anot: 857 return {"Float": arg} 858 return {"Data": {"Float": arg}} 859 elif type(arg) is bool: 860 if skip_data_anot: 861 return {"Bool": arg} 862 return {"Data": {"Bool": arg}} 863 elif type(arg) is tuple or type(arg) is list: 864 if skip_data_anot: 865 return {"List": [_json_arg(x, True) for x in list(arg)]} 866 return {"Data": {"List": [_json_arg(x, True) for x in list(arg)]}} 867 else: 868 raise Exception(f"Unknown arg type: {type(arg)}") 869 870 871class Filter: 872 """A video filter.""" 873 874 def __init__(self, func: str): 875 self._func = func 876 877 def __call__(self, *args, **kwargs): 878 return FilterExpr(self, args, kwargs) 879 880 881class FilterExpr: 882 def __init__(self, filter: Filter, args, kwargs): 883 self._filter = filter 884 self._args = args 885 self._kwargs = kwargs 886 887 def __repr__(self): 888 args = [] 889 for arg in self._args: 890 val = f'"{arg}"' if type(arg) is str else str(arg) 891 args.append(str(val)) 892 for k, v in self._kwargs.items(): 893 val = f'"{v}"' if type(v) is str else str(v) 894 args.append(f"{k}={val}") 895 return f"{self._filter._func}({', '.join(args)})" 896 897 def _to_json_spec(self): 898 args = [] 899 for arg in self._args: 900 args.append(_json_arg(arg)) 901 kwargs = {} 902 for k, v in self._kwargs.items(): 903 kwargs[k] = _json_arg(v) 904 return {"Filter": {"name": self._filter._func, "args": args, "kwargs": kwargs}} 905 906 def _sources(self): 907 s = set() 908 for arg in self._args: 909 if type(arg) is FilterExpr or type(arg) is SourceExpr: 910 s = s.union(arg._sources()) 911 for arg in self._kwargs.values(): 912 if type(arg) is FilterExpr or type(arg) is SourceExpr: 913 s = s.union(arg._sources()) 914 return s 915 916 def _filters(self): 917 f = {self._filter._func: self._filter} 918 for arg in self._args: 919 if type(arg) is FilterExpr: 920 f = {**f, **arg._filters()} 921 for arg in self._kwargs.values(): 922 if type(arg) is FilterExpr: 923 f = {**f, **arg._filters()} 924 return f
class
Source:
399class Source: 400 def __init__(self, id: str, src): 401 self._name = id 402 self._fmt = { 403 "width": src["width"], 404 "height": src["height"], 405 "pix_fmt": src["pix_fmt"], 406 } 407 self._ts = [Fraction(x[0], x[1]) for x in src["ts"]] 408 self.iloc = _SourceILoc(self) 409 410 def id(self) -> str: 411 return self._name 412 413 def fmt(self): 414 return {**self._fmt} 415 416 def ts(self) -> list[Fraction]: 417 return self._ts.copy() 418 419 def __len__(self): 420 return len(self._ts) 421 422 def __getitem__(self, idx): 423 if type(idx) is not Fraction: 424 raise Exception("Source index must be a Fraction") 425 return SourceExpr(self, idx, False) 426 427 def __repr__(self): 428 return f"Source({self._name})"
class
Spec:
431class Spec: 432 def __init__(self, id: str, src): 433 self._id = id 434 self._fmt = { 435 "width": src["width"], 436 "height": src["height"], 437 "pix_fmt": src["pix_fmt"], 438 } 439 self._vod_endpoint = src["vod_endpoint"] 440 parsed_url = urlparse(self._vod_endpoint) 441 self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js" 442 443 def id(self) -> str: 444 return self._id 445 446 def play(self, method): 447 url = f"{self._vod_endpoint}playlist.m3u8" 448 status_url = f"{self._vod_endpoint}status" 449 hls_js_url = self._hls_js_url 450 return _play(self._id, url, hls_js_url, method=method, status_url=status_url)
Spec(id: str, src)
432 def __init__(self, id: str, src): 433 self._id = id 434 self._fmt = { 435 "width": src["width"], 436 "height": src["height"], 437 "pix_fmt": src["pix_fmt"], 438 } 439 self._vod_endpoint = src["vod_endpoint"] 440 parsed_url = urlparse(self._vod_endpoint) 441 self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
class
Server:
453class Server: 454 def __init__( 455 self, endpoint: str, api_key: str, vod_only=False, cv2_writer_init_callback=None 456 ): 457 if not endpoint.startswith("http://") and not endpoint.startswith("https://"): 458 raise Exception("Endpoint must start with http:// or https://") 459 if endpoint.endswith("/"): 460 raise Exception("Endpoint must not end with /") 461 self._endpoint = endpoint 462 463 self._api_key = api_key 464 self._session = requests.Session() 465 self._session.headers.update({"Authorization": f"Bearer {self._api_key}"}) 466 response = self._session.get( 467 f"{self._endpoint}/v2/auth", 468 headers={"Authorization": f"Bearer {self._api_key}"}, 469 ) 470 if not response.ok: 471 raise Exception(response.text) 472 response = response.json() 473 assert response["status"] == "ok" 474 self._vod_only = vod_only 475 self._cv2_writer_init_callback = cv2_writer_init_callback 476 477 def is_vod_only(self) -> bool: 478 return self._vod_only 479 480 def cv2_writer_init_callback(self): 481 return self._cv2_writer_init_callback 482 483 def get_source(self, id: str) -> Source: 484 assert type(id) is str 485 response = self._session.get( 486 f"{self._endpoint}/v2/source/{id}", 487 headers={"Authorization": f"Bearer {self._api_key}"}, 488 ) 489 if not response.ok: 490 raise Exception(response.text) 491 response = response.json() 492 return Source(response["id"], response) 493 494 def list_sources(self) -> list[str]: 495 response = self._session.get( 496 f"{self._endpoint}/v2/source", 497 headers={"Authorization": f"Bearer {self._api_key}"}, 498 ) 499 if not response.ok: 500 raise Exception(response.text) 501 response = response.json() 502 return response 503 504 def delete_source(self, id: str): 505 assert type(id) is str 506 response = self._session.delete( 507 f"{self._endpoint}/v2/source/{id}", 508 headers={"Authorization": f"Bearer {self._api_key}"}, 509 ) 510 if not response.ok: 511 raise Exception(response.text) 512 response = response.json() 513 assert response["status"] == "ok" 514 515 def search_source( 516 self, name, stream_idx, storage_service, storage_config 517 ) -> list[str]: 518 assert type(name) is str 519 assert type(stream_idx) is int 520 assert type(storage_service) is str 521 assert type(storage_config) is dict 522 for k, v in storage_config.items(): 523 assert type(k) is str 524 assert type(v) is str 525 req = { 526 "name": name, 527 "stream_idx": stream_idx, 528 "storage_service": storage_service, 529 "storage_config": storage_config, 530 } 531 response = self._session.post( 532 f"{self._endpoint}/v2/source/search", 533 json=req, 534 headers={"Authorization": f"Bearer {self._api_key}"}, 535 ) 536 if not response.ok: 537 raise Exception(response.text) 538 response = response.json() 539 return response 540 541 def create_source( 542 self, name, stream_idx, storage_service, storage_config 543 ) -> Source: 544 assert type(name) is str 545 assert type(stream_idx) is int 546 assert type(storage_service) is str 547 assert type(storage_config) is dict 548 for k, v in storage_config.items(): 549 assert type(k) is str 550 assert type(v) is str 551 req = { 552 "name": name, 553 "stream_idx": stream_idx, 554 "storage_service": storage_service, 555 "storage_config": storage_config, 556 } 557 response = self._session.post( 558 f"{self._endpoint}/v2/source", 559 json=req, 560 headers={"Authorization": f"Bearer {self._api_key}"}, 561 ) 562 if not response.ok: 563 raise Exception(response.text) 564 response = response.json() 565 assert response["status"] == "ok" 566 id = response["id"] 567 return self.get_source(id) 568 569 def source(self, name, stream_idx, storage_service, storage_config) -> Source: 570 """Convenience function for accessing sources. 571 572 Tries to find a source with the given name, stream_idx, storage_service, and storage_config. 573 If no source is found, creates a new source with the given parameters. 574 """ 575 576 sources = self.search_source(name, stream_idx, storage_service, storage_config) 577 if len(sources) == 0: 578 return self.create_source(name, stream_idx, storage_service, storage_config) 579 return self.get_source(sources[0]) 580 581 def get_spec(self, id: str) -> Spec: 582 assert type(id) is str 583 response = self._session.get( 584 f"{self._endpoint}/v2/spec/{id}", 585 headers={"Authorization": f"Bearer {self._api_key}"}, 586 ) 587 if not response.ok: 588 raise Exception(response.text) 589 response = response.json() 590 return Spec(response["id"], response) 591 592 def list_specs(self) -> list[str]: 593 response = self._session.get( 594 f"{self._endpoint}/v2/spec", 595 headers={"Authorization": f"Bearer {self._api_key}"}, 596 ) 597 if not response.ok: 598 raise Exception(response.text) 599 response = response.json() 600 return response 601 602 def create_spec( 603 self, 604 width, 605 height, 606 pix_fmt, 607 vod_segment_length, 608 frame_rate, 609 ready_hook=None, 610 steer_hook=None, 611 ttl=None, 612 ) -> Spec: 613 assert type(width) is int 614 assert type(height) is int 615 assert type(pix_fmt) is str 616 assert type(vod_segment_length) is Fraction 617 assert type(frame_rate) is Fraction 618 assert type(ready_hook) is str or ready_hook is None 619 assert type(steer_hook) is str or steer_hook is None 620 assert ttl is None or type(ttl) is int 621 622 req = { 623 "width": width, 624 "height": height, 625 "pix_fmt": pix_fmt, 626 "vod_segment_length": [ 627 vod_segment_length.numerator, 628 vod_segment_length.denominator, 629 ], 630 "frame_rate": [frame_rate.numerator, frame_rate.denominator], 631 "ready_hook": ready_hook, 632 "steer_hook": steer_hook, 633 "ttl": ttl, 634 } 635 response = self._session.post( 636 f"{self._endpoint}/v2/spec", 637 json=req, 638 headers={"Authorization": f"Bearer {self._api_key}"}, 639 ) 640 if not response.ok: 641 raise Exception(response.text) 642 response = response.json() 643 assert response["status"] == "ok" 644 return self.get_spec(response["id"]) 645 646 def delete_spec(self, id: str): 647 assert type(id) is str 648 response = self._session.delete( 649 f"{self._endpoint}/v2/spec/{id}", 650 headers={"Authorization": f"Bearer {self._api_key}"}, 651 ) 652 if not response.ok: 653 raise Exception(response.text) 654 response = response.json() 655 assert response["status"] == "ok" 656 657 def export_spec( 658 self, id: str, path: str, encoder=None, encoder_opts=None, format=None 659 ): 660 assert type(id) is str 661 assert type(path) is str 662 req = { 663 "path": path, 664 "encoder": encoder, 665 "encoder_opts": encoder_opts, 666 "format": format, 667 } 668 response = self._session.post( 669 f"{self._endpoint}/v2/spec/{id}/export", 670 json=req, 671 headers={"Authorization": f"Bearer {self._api_key}"}, 672 ) 673 if not response.ok: 674 raise Exception(response.text) 675 response = response.json() 676 assert response["status"] == "ok" 677 678 def push_spec_part(self, spec_id, pos, frames, terminal): 679 if type(spec_id) is Spec: 680 spec_id = spec_id._id 681 assert type(spec_id) is str 682 assert type(pos) is int 683 assert type(frames) is list 684 assert type(terminal) is bool 685 686 req_frames = [] 687 for frame in frames: 688 assert type(frame) is tuple 689 assert len(frame) == 2 690 t = frame[0] 691 f = frame[1] 692 assert type(t) is Fraction 693 assert f is None or type(f) is SourceExpr or type(f) is FilterExpr 694 req_frames.append( 695 [ 696 [t.numerator, t.denominator], 697 f._to_json_spec() if f is not None else None, 698 ] 699 ) 700 701 req = { 702 "pos": pos, 703 "frames": req_frames, 704 "terminal": terminal, 705 } 706 response = self._session.post( 707 f"{self._endpoint}/v2/spec/{spec_id}/part", 708 json=req, 709 headers={"Authorization": f"Bearer {self._api_key}"}, 710 ) 711 if not response.ok: 712 raise Exception(response.text) 713 response = response.json() 714 assert response["status"] == "ok" 715 716 def push_spec_part_block( 717 self, spec_id: str, pos, blocks, terminal, compression="gzip" 718 ): 719 if type(spec_id) is Spec: 720 spec_id = spec_id._id 721 assert type(spec_id) is str 722 assert type(pos) is int 723 assert type(blocks) is list 724 assert type(terminal) is bool 725 assert compression is None or compression == "gzip" 726 727 req_blocks = [] 728 for block in blocks: 729 assert type(block) is _FrameExpressionBlock 730 block_body = block.as_dict() 731 block_frames = len(block_body["frame_exprs"]) 732 block_body = json.dumps(block_body).encode("utf-8") 733 if compression == "gzip": 734 block_body = gzip.compress(block_body, 1) 735 block_body = base64.b64encode(block_body).decode("utf-8") 736 req_blocks.append( 737 { 738 "frames": block_frames, 739 "compression": compression, 740 "body": block_body, 741 } 742 ) 743 744 req = { 745 "pos": pos, 746 "terminal": terminal, 747 "blocks": req_blocks, 748 } 749 response = self._session.post( 750 f"{self._endpoint}/v2/spec/{spec_id}/part_block", 751 json=req, 752 headers={"Authorization": f"Bearer {self._api_key}"}, 753 ) 754 if not response.ok: 755 raise Exception(response.text) 756 response = response.json() 757 assert response["status"] == "ok" 758 759 def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"): 760 assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr 761 assert compression is None or compression in ["gzip"] 762 feb = _FrameExpressionBlock() 763 feb.insert_frame(frame_expr) 764 feb_body = feb.as_dict() 765 766 feb_body = json.dumps(feb_body).encode("utf-8") 767 if compression == "gzip": 768 feb_body = gzip.compress(feb_body, 1) 769 feb_body = base64.b64encode(feb_body).decode("utf-8") 770 req = { 771 "width": width, 772 "height": height, 773 "pix_fmt": pix_fmt, 774 "compression": compression, 775 "block": { 776 "frames": 1, 777 "compression": compression, 778 "body": feb_body, 779 }, 780 } 781 response = self._session.post( 782 f"{self._endpoint}/v2/frame", 783 json=req, 784 headers={"Authorization": f"Bearer {self._api_key}"}, 785 ) 786 if not response.ok: 787 raise Exception(response.text) 788 response_body = response.content 789 assert type(response_body) is bytes 790 if compression == "gzip": 791 response_body = gzip.decompress(response_body) 792 return response_body
Server( endpoint: str, api_key: str, vod_only=False, cv2_writer_init_callback=None)
454 def __init__( 455 self, endpoint: str, api_key: str, vod_only=False, cv2_writer_init_callback=None 456 ): 457 if not endpoint.startswith("http://") and not endpoint.startswith("https://"): 458 raise Exception("Endpoint must start with http:// or https://") 459 if endpoint.endswith("/"): 460 raise Exception("Endpoint must not end with /") 461 self._endpoint = endpoint 462 463 self._api_key = api_key 464 self._session = requests.Session() 465 self._session.headers.update({"Authorization": f"Bearer {self._api_key}"}) 466 response = self._session.get( 467 f"{self._endpoint}/v2/auth", 468 headers={"Authorization": f"Bearer {self._api_key}"}, 469 ) 470 if not response.ok: 471 raise Exception(response.text) 472 response = response.json() 473 assert response["status"] == "ok" 474 self._vod_only = vod_only 475 self._cv2_writer_init_callback = cv2_writer_init_callback
483 def get_source(self, id: str) -> Source: 484 assert type(id) is str 485 response = self._session.get( 486 f"{self._endpoint}/v2/source/{id}", 487 headers={"Authorization": f"Bearer {self._api_key}"}, 488 ) 489 if not response.ok: 490 raise Exception(response.text) 491 response = response.json() 492 return Source(response["id"], response)
def
delete_source(self, id: str):
504 def delete_source(self, id: str): 505 assert type(id) is str 506 response = self._session.delete( 507 f"{self._endpoint}/v2/source/{id}", 508 headers={"Authorization": f"Bearer {self._api_key}"}, 509 ) 510 if not response.ok: 511 raise Exception(response.text) 512 response = response.json() 513 assert response["status"] == "ok"
def
search_source(self, name, stream_idx, storage_service, storage_config) -> list[str]:
515 def search_source( 516 self, name, stream_idx, storage_service, storage_config 517 ) -> list[str]: 518 assert type(name) is str 519 assert type(stream_idx) is int 520 assert type(storage_service) is str 521 assert type(storage_config) is dict 522 for k, v in storage_config.items(): 523 assert type(k) is str 524 assert type(v) is str 525 req = { 526 "name": name, 527 "stream_idx": stream_idx, 528 "storage_service": storage_service, 529 "storage_config": storage_config, 530 } 531 response = self._session.post( 532 f"{self._endpoint}/v2/source/search", 533 json=req, 534 headers={"Authorization": f"Bearer {self._api_key}"}, 535 ) 536 if not response.ok: 537 raise Exception(response.text) 538 response = response.json() 539 return response
541 def create_source( 542 self, name, stream_idx, storage_service, storage_config 543 ) -> Source: 544 assert type(name) is str 545 assert type(stream_idx) is int 546 assert type(storage_service) is str 547 assert type(storage_config) is dict 548 for k, v in storage_config.items(): 549 assert type(k) is str 550 assert type(v) is str 551 req = { 552 "name": name, 553 "stream_idx": stream_idx, 554 "storage_service": storage_service, 555 "storage_config": storage_config, 556 } 557 response = self._session.post( 558 f"{self._endpoint}/v2/source", 559 json=req, 560 headers={"Authorization": f"Bearer {self._api_key}"}, 561 ) 562 if not response.ok: 563 raise Exception(response.text) 564 response = response.json() 565 assert response["status"] == "ok" 566 id = response["id"] 567 return self.get_source(id)
569 def source(self, name, stream_idx, storage_service, storage_config) -> Source: 570 """Convenience function for accessing sources. 571 572 Tries to find a source with the given name, stream_idx, storage_service, and storage_config. 573 If no source is found, creates a new source with the given parameters. 574 """ 575 576 sources = self.search_source(name, stream_idx, storage_service, storage_config) 577 if len(sources) == 0: 578 return self.create_source(name, stream_idx, storage_service, storage_config) 579 return self.get_source(sources[0])
Convenience function for accessing sources.
Tries to find a source with the given name, stream_idx, storage_service, and storage_config. If no source is found, creates a new source with the given parameters.
581 def get_spec(self, id: str) -> Spec: 582 assert type(id) is str 583 response = self._session.get( 584 f"{self._endpoint}/v2/spec/{id}", 585 headers={"Authorization": f"Bearer {self._api_key}"}, 586 ) 587 if not response.ok: 588 raise Exception(response.text) 589 response = response.json() 590 return Spec(response["id"], response)
def
create_spec( self, width, height, pix_fmt, vod_segment_length, frame_rate, ready_hook=None, steer_hook=None, ttl=None) -> Spec:
602 def create_spec( 603 self, 604 width, 605 height, 606 pix_fmt, 607 vod_segment_length, 608 frame_rate, 609 ready_hook=None, 610 steer_hook=None, 611 ttl=None, 612 ) -> Spec: 613 assert type(width) is int 614 assert type(height) is int 615 assert type(pix_fmt) is str 616 assert type(vod_segment_length) is Fraction 617 assert type(frame_rate) is Fraction 618 assert type(ready_hook) is str or ready_hook is None 619 assert type(steer_hook) is str or steer_hook is None 620 assert ttl is None or type(ttl) is int 621 622 req = { 623 "width": width, 624 "height": height, 625 "pix_fmt": pix_fmt, 626 "vod_segment_length": [ 627 vod_segment_length.numerator, 628 vod_segment_length.denominator, 629 ], 630 "frame_rate": [frame_rate.numerator, frame_rate.denominator], 631 "ready_hook": ready_hook, 632 "steer_hook": steer_hook, 633 "ttl": ttl, 634 } 635 response = self._session.post( 636 f"{self._endpoint}/v2/spec", 637 json=req, 638 headers={"Authorization": f"Bearer {self._api_key}"}, 639 ) 640 if not response.ok: 641 raise Exception(response.text) 642 response = response.json() 643 assert response["status"] == "ok" 644 return self.get_spec(response["id"])
def
delete_spec(self, id: str):
646 def delete_spec(self, id: str): 647 assert type(id) is str 648 response = self._session.delete( 649 f"{self._endpoint}/v2/spec/{id}", 650 headers={"Authorization": f"Bearer {self._api_key}"}, 651 ) 652 if not response.ok: 653 raise Exception(response.text) 654 response = response.json() 655 assert response["status"] == "ok"
def
export_spec( self, id: str, path: str, encoder=None, encoder_opts=None, format=None):
657 def export_spec( 658 self, id: str, path: str, encoder=None, encoder_opts=None, format=None 659 ): 660 assert type(id) is str 661 assert type(path) is str 662 req = { 663 "path": path, 664 "encoder": encoder, 665 "encoder_opts": encoder_opts, 666 "format": format, 667 } 668 response = self._session.post( 669 f"{self._endpoint}/v2/spec/{id}/export", 670 json=req, 671 headers={"Authorization": f"Bearer {self._api_key}"}, 672 ) 673 if not response.ok: 674 raise Exception(response.text) 675 response = response.json() 676 assert response["status"] == "ok"
def
push_spec_part(self, spec_id, pos, frames, terminal):
678 def push_spec_part(self, spec_id, pos, frames, terminal): 679 if type(spec_id) is Spec: 680 spec_id = spec_id._id 681 assert type(spec_id) is str 682 assert type(pos) is int 683 assert type(frames) is list 684 assert type(terminal) is bool 685 686 req_frames = [] 687 for frame in frames: 688 assert type(frame) is tuple 689 assert len(frame) == 2 690 t = frame[0] 691 f = frame[1] 692 assert type(t) is Fraction 693 assert f is None or type(f) is SourceExpr or type(f) is FilterExpr 694 req_frames.append( 695 [ 696 [t.numerator, t.denominator], 697 f._to_json_spec() if f is not None else None, 698 ] 699 ) 700 701 req = { 702 "pos": pos, 703 "frames": req_frames, 704 "terminal": terminal, 705 } 706 response = self._session.post( 707 f"{self._endpoint}/v2/spec/{spec_id}/part", 708 json=req, 709 headers={"Authorization": f"Bearer {self._api_key}"}, 710 ) 711 if not response.ok: 712 raise Exception(response.text) 713 response = response.json() 714 assert response["status"] == "ok"
def
push_spec_part_block(self, spec_id: str, pos, blocks, terminal, compression='gzip'):
716 def push_spec_part_block( 717 self, spec_id: str, pos, blocks, terminal, compression="gzip" 718 ): 719 if type(spec_id) is Spec: 720 spec_id = spec_id._id 721 assert type(spec_id) is str 722 assert type(pos) is int 723 assert type(blocks) is list 724 assert type(terminal) is bool 725 assert compression is None or compression == "gzip" 726 727 req_blocks = [] 728 for block in blocks: 729 assert type(block) is _FrameExpressionBlock 730 block_body = block.as_dict() 731 block_frames = len(block_body["frame_exprs"]) 732 block_body = json.dumps(block_body).encode("utf-8") 733 if compression == "gzip": 734 block_body = gzip.compress(block_body, 1) 735 block_body = base64.b64encode(block_body).decode("utf-8") 736 req_blocks.append( 737 { 738 "frames": block_frames, 739 "compression": compression, 740 "body": block_body, 741 } 742 ) 743 744 req = { 745 "pos": pos, 746 "terminal": terminal, 747 "blocks": req_blocks, 748 } 749 response = self._session.post( 750 f"{self._endpoint}/v2/spec/{spec_id}/part_block", 751 json=req, 752 headers={"Authorization": f"Bearer {self._api_key}"}, 753 ) 754 if not response.ok: 755 raise Exception(response.text) 756 response = response.json() 757 assert response["status"] == "ok"
def
frame(self, width, height, pix_fmt, frame_expr, compression='gzip'):
759 def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"): 760 assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr 761 assert compression is None or compression in ["gzip"] 762 feb = _FrameExpressionBlock() 763 feb.insert_frame(frame_expr) 764 feb_body = feb.as_dict() 765 766 feb_body = json.dumps(feb_body).encode("utf-8") 767 if compression == "gzip": 768 feb_body = gzip.compress(feb_body, 1) 769 feb_body = base64.b64encode(feb_body).decode("utf-8") 770 req = { 771 "width": width, 772 "height": height, 773 "pix_fmt": pix_fmt, 774 "compression": compression, 775 "block": { 776 "frames": 1, 777 "compression": compression, 778 "body": feb_body, 779 }, 780 } 781 response = self._session.post( 782 f"{self._endpoint}/v2/frame", 783 json=req, 784 headers={"Authorization": f"Bearer {self._api_key}"}, 785 ) 786 if not response.ok: 787 raise Exception(response.text) 788 response_body = response.content 789 assert type(response_body) is bytes 790 if compression == "gzip": 791 response_body = gzip.decompress(response_body) 792 return response_body
class
SourceExpr:
795class SourceExpr: 796 def __init__(self, source, idx, is_iloc): 797 self._source = source 798 self._idx = idx 799 self._is_iloc = is_iloc 800 801 def __repr__(self): 802 if self._is_iloc: 803 return f"{self._source._name}.iloc[{self._idx}]" 804 else: 805 return f"{self._source._name}[{self._idx}]" 806 807 def _to_json_spec(self): 808 if self._is_iloc: 809 return { 810 "Source": { 811 "video": self._source._name, 812 "index": {"ILoc": int(self._idx)}, 813 } 814 } 815 else: 816 return { 817 "Source": { 818 "video": self._source._name, 819 "index": {"T": [self._idx.numerator, self._idx.denominator]}, 820 } 821 } 822 823 def _sources(self): 824 return set([self._source]) 825 826 def _filters(self): 827 return {}
class
Filter:
872class Filter: 873 """A video filter.""" 874 875 def __init__(self, func: str): 876 self._func = func 877 878 def __call__(self, *args, **kwargs): 879 return FilterExpr(self, args, kwargs)
A video filter.
class
FilterExpr:
882class FilterExpr: 883 def __init__(self, filter: Filter, args, kwargs): 884 self._filter = filter 885 self._args = args 886 self._kwargs = kwargs 887 888 def __repr__(self): 889 args = [] 890 for arg in self._args: 891 val = f'"{arg}"' if type(arg) is str else str(arg) 892 args.append(str(val)) 893 for k, v in self._kwargs.items(): 894 val = f'"{v}"' if type(v) is str else str(v) 895 args.append(f"{k}={val}") 896 return f"{self._filter._func}({', '.join(args)})" 897 898 def _to_json_spec(self): 899 args = [] 900 for arg in self._args: 901 args.append(_json_arg(arg)) 902 kwargs = {} 903 for k, v in self._kwargs.items(): 904 kwargs[k] = _json_arg(v) 905 return {"Filter": {"name": self._filter._func, "args": args, "kwargs": kwargs}} 906 907 def _sources(self): 908 s = set() 909 for arg in self._args: 910 if type(arg) is FilterExpr or type(arg) is SourceExpr: 911 s = s.union(arg._sources()) 912 for arg in self._kwargs.values(): 913 if type(arg) is FilterExpr or type(arg) is SourceExpr: 914 s = s.union(arg._sources()) 915 return s 916 917 def _filters(self): 918 f = {self._filter._func: self._filter} 919 for arg in self._args: 920 if type(arg) is FilterExpr: 921 f = {**f, **arg._filters()} 922 for arg in self._kwargs.values(): 923 if type(arg) is FilterExpr: 924 f = {**f, **arg._filters()} 925 return f