import sys sys.coinit_flags= 0 import win32com.client import pywintypes import os import shutil import http_server_39 as server #import http.server as server import socketserver import threading import asyncio import websockets import logging, json import urllib import posixpath import time import pythoncom logging.basicConfig() global current_slideshow current_slideshow = None CACHEDIR = r'''C:\Windows\Temp\ppt-cache''' CACHE_TIMEOUT = 2*60*60 class Handler(server.SimpleHTTPRequestHandler): def __init__(self, *args, **kwargs): super().__init__(*args, directory=os.path.dirname(os.path.realpath(__file__))) def translate_path(self, path): """Translate a /-separated PATH to the local filename syntax. Components that mean special things to the local file system (e.g. drive or directory names) are ignored. (XXX They should probably be diagnosed.) """ # abandon query parameters path = path.split('?',1)[0] path = path.split('#',1)[0] # Don't forget explicit trailing slash when normalizing. Issue17324 trailing_slash = path.rstrip().endswith('/') try: path = urllib.parse.unquote(path, errors='surrogatepass') except UnicodeDecodeError: path = urllib.parse.unquote(path) path = posixpath.normpath(path) words = path.split('/') words = list(filter(None, words)) if len(words) > 0 and words[0] == "cache": if current_slideshow: path = CACHEDIR + "\\" + current_slideshow.name() words.pop(0) else: path = self.directory for word in words: if os.path.dirname(word) or word in (os.curdir, os.pardir): # Ignore components that are not a simple file/directory name continue path = os.path.join(path, word) if trailing_slash: path += '/' return path def run_http(): http_server = server.HTTPServer(("", 80), Handler) http_server.serve_forever() STATE = {"connected": 0, "current": 0, "total": 0, "visible": 0, "name": ""} USERS = set() def state_event(): return json.dumps({"type": "state", **STATE}) def users_event(): return json.dumps({"type": "users", "count": len(USERS)}) async def notify_state(): global current_slideshow if current_slideshow: STATE["current"] = current_slideshow.current_slide() STATE["total"] = current_slideshow.total_slides() STATE["visible"] = current_slideshow.visible() STATE["name"] = current_slideshow.name() if USERS: # asyncio.wait doesn't accept an empty list message = state_event() await asyncio.wait([user.send(message) for user in USERS]) async def notify_users(): if USERS: # asyncio.wait doesn't accept an empty list message = users_event() await asyncio.wait([user.send(message) for user in USERS]) async def register(websocket): USERS.add(websocket) await notify_users() async def unregister(websocket): USERS.remove(websocket) await notify_users() async def ws_handle(websocket, path): global current_slideshow # register(websocket) sends user_event() to websocket await register(websocket) try: await websocket.send(state_event()) async for message in websocket: data = json.loads(message) if data["action"] == "prev": if current_slideshow: current_slideshow.prev() await notify_state() elif data["action"] == "next": if current_slideshow: current_slideshow.next() await notify_state() elif data["action"] == "first": if current_slideshow: current_slideshow.first() await notify_state() elif data["action"] == "last": if current_slideshow: current_slideshow.last() await notify_state() elif data["action"] == "black": if current_slideshow: if current_slideshow.visible() == 3: current_slideshow.normal() else: current_slideshow.black() await notify_state() elif data["action"] == "white": if current_slideshow: if current_slideshow.visible() == 4: current_slideshow.normal() else: current_slideshow.white() await notify_state() elif data["action"] == "goto": if current_slideshow: current_slideshow.goto(int(data["value"])) await notify_state() elif data["action"] == "refresh": if current_slideshow: current_slideshow.export_current_next() current_slideshow.refresh() await notify_state() else: logging.error("unsupported event: {}", data) finally: await unregister(websocket) def run_ws(): pythoncom.CoInitializeEx(pythoncom.COINIT_MULTITHREADED) asyncio.set_event_loop(asyncio.new_event_loop()) start_server = websockets.serve(ws_handle, "0.0.0.0", 5678) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() def start_server(): #STATE["current"] = current_slide() http_daemon = threading.Thread(name="http_daemon", target=run_http) http_daemon.setDaemon(True) http_daemon.start() print("Started HTTP server") #run_ws() ws_daemon = threading.Thread(name="ws_daemon", target=run_ws) ws_daemon.setDaemon(True) ws_daemon.start() print("Started websocket server") #try: # ws_daemon.start() # http_daemon.start() #except (KeyboardInterrupt, SystemExit): # cleanup_stop_thread() # sys.exit() class Slideshow: def __init__(self, instance): self.instance = instance if self.instance is None: raise ValueError("PPT instance cannot be None") if self.instance.SlideShowWindows.Count == 0: raise ValueError("PPT instance has no slideshow windows") self.view = self.instance.SlideShowWindows[0].View if self.instance.ActivePresentation is None: raise ValueError("PPT instance has no active presentation") self.presentation = self.instance.ActivePresentation self.export_all() def refresh(self): if self.instance is None: raise ValueError("PPT instance cannot be None") #if self.instance.SlideShowWindows.Count == 0: # raise ValueError("PPT instance has no slideshow windows") self.view = self.instance.SlideShowWindows[0].View if self.instance.ActivePresentation is None: raise ValueError("PPT instance has no active presentation") def total_slides(self): return len(self.presentation.Slides) def current_slide(self): return self.view.CurrentShowPosition def visible(self): return self.view.State def prev(self): self.refresh() self.view.Previous() def next(self): self.refresh() self.view.Next() self.export_current_next() def first(self): self.refresh() self.view.First() self.export_current_next() def last(self): self.refresh() self.view.Last() self.export_current_next() def goto(self, slide): self.refresh() if slide <= self.total_slides(): self.view.GotoSlide(slide) else: self.last() self.next() self.export_current_next() def black(self): self.refresh() self.view.State = 3 def white(self): self.refresh() self.view.State = 4 def normal(self): self.refresh() self.view.State = 1 def name(self): return self.presentation.Name def export_current_next(self): self.export(self.current_slide()) self.export(self.current_slide() + 1) def export(self, slide): destination = CACHEDIR + "\\" + self.name() + "\\" + str(slide) + ".jpg" os.makedirs(os.path.dirname(destination), exist_ok=True) if not os.path.exists(destination) or time.time() - os.path.getmtime(destination) > CACHE_TIMEOUT: if slide <= self.total_slides(): attempts = 0 while attempts < 3: try: self.presentation.Slides(slide).Export(destination, "JPG") time.sleep(0.5) break except: pass attempts += 1 elif slide == self.total_slides() + 1: shutil.copyfileobj(open(os.path.dirname(os.path.realpath(__file__)) + r'''\black.jpg''', 'rb'), open(destination, 'wb')) else: pass def export_all(self): for i in range(1, self.total_slides()): self.export(i) def get_ppt_instance(): instance = win32com.client.Dispatch('Powerpoint.Application') if instance is None or instance.SlideShowWindows.Count == 0: return None return instance def get_current_slideshow(): return current_slideshow if __name__ == "__main__": start_server() while True: # Check if PowerPoint is running instance = get_ppt_instance() try: current_slideshow = Slideshow(instance) STATE["connected"] = 1 STATE["current"] = current_slideshow.current_slide() STATE["total"] = current_slideshow.total_slides() print("Connected to PowerPoint instance") break except ValueError as e: current_slideshow = None pass time.sleep(1)