Thursday, November 6, 2025

Panasonic Viera Remote Control app w/ Python.

Wiz Light akıllı ampüller gibi resmi mobil uygulaması olup resmi desktop uygulaması olmayan bir başka cihaz ekosistemi de Panasonic Viera serisi TV'ler. Bu TV'lerin LAN üzerinden kontrol edilebilmesini sağlayan uzaktan kumanda uygulaması için ekran görüntüsü ve Python çözümü aşağıda.



Panasonic Viera - UPnP discovery + simple remote control (CLI + Tkinter GUI)
Works without PIN (if your TV doesn't require pairing).
Discovery via SSDP -> fetch device description -> find remote control service.
If discovery fails, you can enter IP manually.

Dependencies:
    pip install requests

Run:
    python panasonic_remote.py        # opens GUI
    python panasonic_remote.py --cli  # example CLI mode

Author: kadir — lightweight and pragmatic.
"""
import sys
import socket
import time
import threading
import requests
import xml.etree.ElementTree as ET
from urllib.parse import urljoin, urlparse
import argparse

try:
    import tkinter as tk
    from tkinter import simpledialog, messagebox
    HAS_TK = True
except Exception:
    HAS_TK = False

SSDP_ADDR = "239.255.255.250"
SSDP_PORT = 1900
SSDP_MX = 2
# Some Panasonic TVs respond to urn:schemas-upnp-org:device:MediaRenderer:1
SSDP_STS = [
    "urn:schemas-upnp-org:device:MediaRenderer:1",
    "ssdp:all",
    "urn:schemas-sony-com:service:IRCC:1",  # sometimes other vendors, harmless
]

DISCOVERY_RETRIES = 3
DISCOVERY_TIMEOUT = 2.0  # seconds per SSDP recv
HTTP_TIMEOUT = 4.0

# Common Panasonic remote keys (NRC_* family). You can add more if needed.
COMMON_KEYS = [
    "NRC_POWER-ONOFF", "NRC_VOLUP-ONOFF", "NRC_VOLDOWN-ONOFF",
    "NRC_CH_UP-ONOFF", "NRC_CH_DOWN-ONOFF", "NRC_MUTE-ONOFF",
    "NRC_MENU-ONOFF", "NRC_EXIT-ONOFF", "NRC_HOME-ONOFF",
    "NRC_INFO-ONOFF", "NRC_EPG-ONOFF", "NRC_GUIDE-ONOFF",
]

def ssdp_search(st, timeout=DISCOVERY_TIMEOUT, mx=SSDP_MX):
    """Send a single SSDP M-SEARCH and collect responses (non-blocking)."""
    msg = "\r\n".join([
        'M-SEARCH * HTTP/1.1',
        f'HOST: {SSDP_ADDR}:{SSDP_PORT}',
        'MAN: "ssdp:discover"',
        f'ST: {st}',
        f'MX: {mx}',
        '', ''
    ]).encode('utf-8')

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    sock.settimeout(timeout)
    # Set TTL to 2 so it stays in local net
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)

    try:
        sock.sendto(msg, (SSDP_ADDR, SSDP_PORT))
    except Exception as e:
        print("SSDP send error:", e)
        sock.close()
        return []

    responses = []
    t0 = time.time()
    while True:
        try:
            data, addr = sock.recvfrom(65507)
            text = data.decode('utf-8', errors='replace')
            responses.append((text, addr))
        except socket.timeout:
            break
        except Exception:
            break
        # small safety to not loop forever
        if time.time() - t0 > timeout + 0.5:
            break
    sock.close()
    return responses

def parse_ssdp_response(text):
    """Parse SSDP response into dict of headers (case-insensitive)."""
    lines = text.splitlines()
    headers = {}
    for line in lines[1:]:
        if ':' in line:
            k, v = line.split(':', 1)
            headers[k.strip().upper()] = v.strip()
    return headers

def discover_panasonic(timeout=DISCOVERY_TIMEOUT, retries=DISCOVERY_RETRIES):
    """Try multiple SSDP searches and return list of candidate device LOCATION URLs."""
    locations = {}
    for attempt in range(retries):
        for st in SSDP_STS:
            resps = ssdp_search(st, timeout=timeout)
            for text, addr in resps:
                hdr = parse_ssdp_response(text)
                loc = hdr.get('LOCATION') or hdr.get('LOCATION'.upper())
                st_hdr = hdr.get('ST') or hdr.get('NT')
                usn = hdr.get('USN', '')
                if loc:
                    # Use parsed netloc as key to reduce duplicates
                    try:
                        key = urlparse(loc).netloc
                    except Exception:
                        key = loc
                    locations[key] = loc
        # quick sleep between retries
        time.sleep(0.25)
        if locations:
            break
    return list(locations.values())

def fetch_device_description(location_url):
    """GET device description XML and parse services. Return base URL and XML root."""
    try:
        r = requests.get(location_url, timeout=HTTP_TIMEOUT)
        r.raise_for_status()
        xmltext = r.text
        root = ET.fromstring(xmltext)
        # Base URL: try to read  or use location origin
        base_elem = root.find('{urn:schemas-upnp-org:device-1-0}URLBase')
        if base_elem is not None and base_elem.text:
            base = base_elem.text.strip()
        else:
            # fallback to scheme+netloc
            p = urlparse(location_url)
            base = f"{p.scheme}://{p.netloc}"
        return base, root
    except Exception as e:
        # print("fetch description error:", e)
        return None, None

def find_remote_control_service(base_url, root):
    """
    Scan device description for a service that provides Panasonic remote control.
    Commonly RemoteControl or IRCC-like services. Return control URL (absolute).
    """
    # Namespaces might vary; map known ones
    ns = {'upnp': 'urn:schemas-upnp-org:device-1-0'}
    service_list = root.findall('.//upnp:service', namespaces=ns)
    for svc in service_list:
        svc_type = svc.find('upnp:serviceType', namespaces=ns)
        svc_control = svc.find('upnp:controlURL', namespaces=ns)
        svc_event = svc.find('upnp:eventSubURL', namespaces=ns)
        if svc_type is None or svc_control is None:
            continue
        st = svc_type.text or ''
        ctrl = svc_control.text or ''
        # heuristics: Panasonic IR/Remote often has "IRCC" or "NRC" or specific Panasonic service types
        if 'IRCC' in st or 'RenderingControl' in st or 'AVTransport' in st or 'panasonic' in st.lower() or 'nrc' in st.lower():
            ctrl_abs = urljoin(base_url, ctrl)
            return ctrl_abs
    # fallback: try first service controlURL
    first = root.find('.//upnp:service/upnp:controlURL', namespaces=ns)
    if first is not None and first.text:
        return urljoin(base_url, first.text)
    return None

def send_key_http(remote_control_url, key):
    """
    Send remote key via HTTP SOAP. Many Panasonic TVs accept POST to /nrc/control_0 or similar.
    This implementation uses common Panasonic IRCC SOAP format.
    """
    # Build simple SOAP message that many Panasonic TVs accept:
    soap_body = f"""
    
      
        
          {key}
        
      
    """

    headers = {
        "Content-Type": "text/xml; charset=utf-8",
        "SOAPACTION": '"urn:panasonic-com:service:p00NetworkControl:1#X_SendKey"',
        "Connection": "close"
    }

    # Try POST; some TVs expect different endpoint paths; we'll attempt variations if necessary.
    candidates = [remote_control_url]
    # common alternative endpoints:
    parsed = urlparse(remote_control_url)
    base = f"{parsed.scheme}://{parsed.netloc}"
    alt_paths = [
        "/nrc/control_0", "/dmr/control_0", "/upnp/control/remote", "/nrc/control_0/HTTP/1.1"
    ]
    for p in alt_paths:
        url = urljoin(base, p)
        if url not in candidates:
            candidates.append(url)

    last_err = None
    for url in candidates:
        try:
            r = requests.post(url, data=soap_body.encode('utf-8'), headers=headers, timeout=HTTP_TIMEOUT)
            # Accept success codes (200/202) and even 500 sometimes (some TVs respond weirdly)
            if r.status_code in (200, 202, 500, 404):
                return True, url, r.status_code
            else:
                last_err = (url, r.status_code, r.text[:200])
        except Exception as e:
            last_err = (url, str(e))
    return False, last_err, None

class PanasonicRemote:
    def __init__(self):
        self.control_url = None
        self.last_location = None
        self.cached_host = None

    def discover_and_setup(self):
        locations = discover_panasonic()
        if not locations:
            return False, "No devices discovered via SSDP."
        # try each location until we find a control URL
        for loc in locations:
            base, root = fetch_device_description(loc)
            if not base or not root:
                continue
            ctrl = find_remote_control_service(base, root)
            if ctrl:
                self.control_url = ctrl
                self.last_location = loc
                self.cached_host = urlparse(loc).netloc
                return True, ctrl
        return False, "No control service found in device descriptions."

    def set_manual_ip(self, ip_or_url):
        # Accept either bare IP or full URL; convert to likely control endpoint
        if ip_or_url.startswith("http://") or ip_or_url.startswith("https://"):
            base = ip_or_url
        else:
            base = f"http://{ip_or_url}"
        # try fetching device description at /description.xml or /dd.xml common endpoints
        candidates = [base, urljoin(base, "/description.xml"), urljoin(base, "/dd.xml")]
        for c in candidates:
            try:
                b, r = fetch_device_description(c)
                if b and r:
                    ctrl = find_remote_control_service(b, r)
                    if ctrl:
                        self.control_url = ctrl
                        self.last_location = c
                        self.cached_host = urlparse(c).netloc
                        return True, ctrl
            except Exception:
                continue
        # fallback: set a plausible control endpoint
        self.control_url = urljoin(base, "/nrc/control_0")
        self.last_location = base
        self.cached_host = urlparse(base).netloc
        return True, self.control_url

    def send_key(self, key, retries=2):
        if not self.control_url:
            return False, "No control URL configured."

        for attempt in range(retries):
            ok, info, status = send_key_http(self.control_url, key)
            if ok:
                return True, info
            # on failure, try rediscover once
            if attempt == 0:
                # try quick rediscover if SSDP available
                try:
                    found, detail = self.discover_and_setup()
                    # if rediscovered a different endpoint, try again
                except Exception:
                    pass
            time.sleep(0.3)
        return False, info

# --- Simple CLI ---
def cli_mode():
    pr = PanasonicRemote()
    print("Discovering Panasonic TVs via SSDP...")
    ok, info = pr.discover_and_setup()
    if ok:
        print("Found control URL:", info)
    else:
        print("Discovery failed:", info)
        manual = input("Manual IP or URL (or Enter to abort): ").strip()
        if not manual:
            print("Aborted.")
            return
        ok, info = pr.set_manual_ip(manual)
        if ok:
            print("Using control URL (manual):", info)
        else:
            print("Manual setup failed:", info)
            return

    print("Available keys (common):", ", ".join(COMMON_KEYS))
    print("Type key and Enter to send. 'exit' to quit.")
    while True:
        k = input("key> ").strip()
        if not k:
            continue
        if k.lower() in ("exit", "quit"):
            break
        ok, info = pr.send_key(k)
        if ok:
            print("OK ->", info)
        else:
            print("FAIL ->", info)

# --- Simple Tkinter GUI ---
def gui_mode():
    if not HAS_TK:
        print("Tkinter not available on this Python. Use --cli.")
        return
    pr = PanasonicRemote()

    root = tk.Tk()
    root.title("Panasonic Remote — kadir")
    root.geometry("420x360")

    status_var = tk.StringVar(value="Durum: keşfediliyor... (SSDP)")
    ip_var = tk.StringVar(value="")

    def discover_thread():
        status_var.set("Keşfediliyor...")
        ok, info = pr.discover_and_setup()
        if ok:
            status_var.set(f"Bulundu: {pr.cached_host}")
            ip_var.set(pr.cached_host)
        else:
            status_var.set("Keşif başarısız. IP gir veya Yeniden Dene.")
            ip_var.set("")

    def on_discover():
        threading.Thread(target=discover_thread, daemon=True).start()

    def on_manual():
        ip = simpledialog.askstring("Manual IP", "TV IP veya URL gir:", initialvalue=ip_var.get())
        if not ip:
            return
        ok, info = pr.set_manual_ip(ip)
        if ok:
            status_var.set(f"Manual set: {pr.cached_host}")
            ip_var.set(pr.cached_host)
        else:
            messagebox.showerror("Hata", "Manual kurulum başarısız.")

    def send_key_gui(key):
        status_var.set(f"Sending {key} ...")
        def work():
            ok, info = pr.send_key(key)
            if ok:
                status_var.set(f"Sent {key}")
            else:
                status_var.set(f"Failed {key}: {info}")
        threading.Thread(target=work, daemon=True).start()

    # Layout
    top = tk.Frame(root, pady=6)
    top.pack(fill=tk.X)
    tk.Label(top, textvariable=status_var, anchor='w').pack(fill=tk.X, padx=6)
    ctrl = tk.Frame(root)
    ctrl.pack(padx=6, pady=8, fill=tk.BOTH, expand=True)
    # Buttons grid
    btns = [
        ("Power", "NRC_POWER-ONOFF"),
        ("Vol+", "NRC_VOLUP-ONOFF"),
        ("Vol-", "NRC_VOLDOWN-ONOFF"),
        ("Mute", "NRC_MUTE-ONOFF"),
        ("CH+", "NRC_CH_UP-ONOFF"),
        ("CH-", "NRC_CH_DOWN-ONOFF"),
        ("Menu", "NRC_MENU-ONOFF"),
        ("Home", "NRC_HOME-ONOFF"),
        ("Info", "NRC_INFO-ONOFF"),
        ("Exit", "NRC_EXIT-ONOFF"),
        ("Return", "NRC_RETURN-ONOFF"),
    ]
    r = 0; c = 0
    for label, k in btns:
        b = tk.Button(ctrl, text=label, width=10, command=lambda kk=k: send_key_gui(kk))
        b.grid(row=r, column=c, padx=4, pady=4)
        c += 1
        if c >= 3:
            c = 0; r += 1

    bottom = tk.Frame(root, pady=4)
    bottom.pack(fill=tk.X)
    tk.Button(bottom, text="Discover", command=on_discover).pack(side=tk.LEFT, padx=6)
    tk.Button(bottom, text="Manual IP", command=on_manual).pack(side=tk.LEFT)
    tk.Button(bottom, text="Quit", command=root.quit).pack(side=tk.RIGHT, padx=6)

    # Start initial discovery
    threading.Thread(target=discover_thread, daemon=True).start()
    root.mainloop()

# --- main ---
def main():
    p = argparse.ArgumentParser()
    p.add_argument("--cli", action="store_true", help="run CLI mode")
    args = p.parse_args()
    if args.cli:
        cli_mode()
    else:
        gui_mode()

if __name__ == "__main__":
    main()