June 14, 2026
I Accessed 2,000+ Unsecured Ollama Servers
There is an unsettling trend within the open-source machine learning community: security is often an afterthought. ML is a young field…
Victor Condino
19 min read
There is an unsettling trend within the open-source machine learning community: security is often an afterthought. ML is a young field, after all, and frequently the tools and frameworks that power it are so new that they just haven't existed for long enough to be thoroughly battle-tested. One good example of this is Ollama.
What is Ollama?
In case you're not already familiar, Ollama is a popular, open-source program which allows anyone with a fancy GPU to interact ad-infinitum with powerful large language models (LLMs) running locally, completely offline, and free from subscription fees and privacy concerns that come with the alternative — sending every prompt to a hosted inference platform like the Google Gemini API or Anthropic's Claude APIs, which charge a miniscule fee per-token, and require users to agree to their conversations being used to train future model upgrades.
Tokens can add up quickly, and with agentic workflows and agent teams, which have become quite fashionable lately, it's far too easy to produce traces with millions of tokens from a single overly broad request or one with ill-defined goals or stopping conditions. For many, running models locally is an attractive alternative, despite the slightly lower benchmark scores. I've never been a big fan of intelligence tests, though, and it stands to reason that if you can't accurately assess a person's overall level of intelligence with a glorified questionnaire, then it would be just as foolhardy to rely too much on similar metrics when applied to thinking machines.
Ollama owes much of its success to its ease of use; It is easy to install and get up and running, and offers a no-frills text conversational interface, through which the forward-thinking user can converse with a veritable pantheon of freely-available instruction-following AI chat-bot models. It has built-in support for fetching models directly from popular free hosting sites like Huggingface and GitHub, and it uses interoperability standards adopted by the ML community, like OpenAI's Inference API, allowing it to be used as a drop-in replacement for the engines powering popular coding tools like Claude Code and automation tools like OpenClaw. It can even run so-called embedding models, which are central to RAG and knowledge-base semantic search applications, where, again, the lack of a roundtrip to a server managed by a third party is an obvious advantage to anyone for whom data security and privacy are a concern (which should be all of us.)
How Ollama Works
Ollama is designed with a two-part client-server architecture, separating the part which does the heavy lifting of computing the model's activations and decoding them back into textual responses from the part of the program that provides an interactive user interface which displays the conversation history and allows users to compose and submit their prompts to be processed by the model. These two separate concerns fit nicely into the dual roles of a server and client, respectively.
We normally think of clients and servers as programs that run on separate machines linked by a network like a LAN or the Internet. It is just as common, though, to have the client and server running on the same computer. The connection never actually flows through a physical network, it just bounces the blocks of information and commands back and forth between the two running programs virtually within the computer's memory.
This added complexity might seem unnecessary, but there are numerous advantages to this approach. One server process can provide access to a large resource like a multi-gigabyte language model to multiple running client processes, queueing and processing requests that require exclusive access to the model and GPU serially, ensuring that each client waits its turn. Being separate processes, if a client should crash due to an unrecoverable error, the server's process is unaffected, allowing other client instances to continue functioning normally. It also simplifies development; programmers can code and test the two programs independently; the only overlap is the format of the messages that they use to communicate the prompts to process and the model's responses.
Since Ollama was designed to be used in this local-only configuration, its connections are not authenticated in any way… the server does not require clients to submit credentials or "log in" with a user name and password or secret token of any kind, which would make it more difficult to use with little-to-no practical advantage, since connections from the outside are normally prevented by the computer's firewall or by a router which connects the devices on your local network to the internet at large, allowing outbound connections but blocking any incoming connection requests. Bottom line: if you have installed Ollama on your desktop box, rest assured, you are probably not at risk.
When Is Ollama Insecure?
There are more advanced scenarios, for instance when renting cloud hosted GPU servers, which is common for AI-powered subscription services that need to be able to flexibly scale capacity in accordance with fluctuations in usage, where Ollama is being used to power some AI feature that is just one piece of a larger application, or for academic and commercial research and development efforts which are running many instances of Ollama at a large scale to process batched prompts. In this case, the Ollama server runs on a networked machine which must be externally reachable from whatever machine the client is running on. This makes things trickier because now Ollama must accept connections from other machines on the network, despite lacking, as noted above, any type of built-in authentication.
There are ways to mitigate this shortcoming… one could limit connections to the Ollama server socket using a firewall like iptables, and tunnel the connections from the client over a secure protocol which supports connection forwarding, such as ssh. This is generally the route taken when this type of situation arises, but it is not exactly trivial to implement and and even less trivial to get working reliably and seamlessly. The ssh client and server must be installed and configured for automatic, passwordless authentication using private and public key-pairs, and the tunneled connection must be specified on the command line when connecting to the machine running the Ollama server. Timeouts and automatic reconnection must also be configured properly, or the tunneled port is liable to stop working when the connection goes idle or the network connection becomes momentarily unstable.
All of this is what lead me to believe that there must likely be many unsecured Ollama servers running in the wild. The only thing stopping anyone from connecting to and using these open servers is not knowing their IP addresses.
How I Found Hosts Running Ollama
You might think that this textbook example of security by obscurity would be the saving grace for anyone who runs an Ollama server on an unprotected machine with internet access, but the internet is a relatively unrestricted medium. There is literally nothing stopping someone from just randomly (or methodically) trying to connect to every single port on every single possible address, to see which ones will accept connections and what the programs listening for those connections have to say to introduce themselves.
An exhaustive enumeration of every possible permutation of both port and IP address would take a fairly long time to run, even with a very, very fast internet connection. Fortunately, this work has already been done for us, and what's more we can search the results with a convenient and easy-to-use web interface, through sites like leakix.net.
Port Scanning In Python
This is a valuable resource for cybersecurity researchers and curious individuals like myself, so I wrote a python script which uses their API to automate searching and tabulating the results, which you can find here, and looks like this:
"""
Search the leakix.net database and save the results to a .jsonl file for
further processing.
Requires an API key, get it [here](https://leakix.net/settings/api).
Pass it on the command line with the --api-key option argument or put it
in a .env file, like this: `LEAKIX_API_KEY=<your-key>`. Their API is free
to use (with generous limits), but please don't abuse it, or you will
likely be banned, or worse.
- Vic <un1tz3r0@gmail.com> Jun 13, 2026
"""
import asyncio
import json
import re
import aiofiles
import aiohttp
import click
from aiopathlib import AsyncPath
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
)
LEAKIX_API_KEY = None
SEARCH_URL = "https://leakix.net/search"
def load_env(path: str = ".env") -> dict[str, str]:
'''
Load environment variables from a .env file.
Args:
path (str): Path to the .env file. Defaults to ".env".
Returns:
dict[str, str]: A dictionary containing the environment variables.
The .env file should have lines in the format KEY=VALUE. Lines starting
with # are treated as comments and ignored. Values can be optionally
enclosed in single or double quotes. If enclosed in double quotes, escape
sequences will be processed.
'''
env: dict[str, str] = {}
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("export "):
line = line[7:].lstrip()
key, sep, value = line.partition("=")
if not sep:
continue
key = key.strip()
if not key:
continue
value = value.strip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'):
quote = value[0]
value = value[1:-1]
if quote == '"':
value = value.encode("utf-8").decode("unicode_escape")
env[key] = value
return env
# load the api key from the .env file if not set via env var or command line
LEAKIX_API_KEY = load_env().get("LEAKIX_API_KEY") or LEAKIX_API_KEY
def sanitize_query(q: str) -> str:
slug = re.sub(r"[^A-Za-z0-9._-]+", "-", q).strip("-")
return slug or "query"
async def fetch_page(
session: aiohttp.ClientSession,
query: str,
page: int,
scope: str,
api_key: str,
console: Console,
) -> tuple[list | None, int, str | None]:
headers = {"api-key": api_key, "accept": "application/json"}
params = {"q": query, "scope": scope, "page": str(page)}
tries = 0
while True:
tries = tries + 1
console.print(
f"[cyan]Fetch URL {SEARCH_URL} with params {params}..."
)
async with session.get(SEARCH_URL, headers=headers, params=params) as resp:
if resp.status == 429:
retry_after_raw = resp.headers.get("Retry-After", "5")
try:
retry_after = max(1, int(float(retry_after_raw)))
except ValueError:
retry_after = 5
console.print(
f"[yellow]429 on page {page}; sleeping {retry_after}s per Retry-After[/]"
)
await asyncio.sleep(retry_after)
continue
if resp.status >= 400:
body = await resp.text()
console.print(
f"[green]{resp.status} on page {page}; got {len(body)} bytes of content"
)
return None, resp.status, body
data = await resp.json()
return data, resp.status, None
async def amain(
query: str, scope: str, output: str, api_key: str, max_pages: int | None
) -> None:
console = Console()
out_path = AsyncPath(output)
console.print(f"[bold cyan]Query:[/] {query}")
console.print(f"[bold cyan]Scope:[/] {scope}")
console.print(f"[bold cyan]Output:[/] {output} (append)")
if max_pages is not None:
console.print(f"[bold cyan]Pages:[/] cap at {max_pages}")
total_items = 0
page = 0
stop_reason = "unknown"
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with aiofiles.open(out_path, "a") as f:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
TimeElapsedColumn(),
console=console,
transient=False,
) as progress:
task = progress.add_task("starting...", total=None)
while True:
if max_pages is not None and page >= max_pages:
stop_reason = f"reached --max-pages={max_pages}"
break
progress.update(
task,
description=f"page {page} | items so far: {total_items}",
)
try:
data, status, err = await fetch_page(
session, query, page, scope, api_key, console
)
except aiohttp.ClientError as e:
stop_reason = f"network error on page {page}: {e!r}"
break
if err is not None:
snippet = (err or "").strip()[:500]
console.print(
f"[red]HTTP {status} on page {page}[/]\n[red]{snippet}[/]"
)
stop_reason = f"HTTP {status} on page {page}"
break
if not data:
stop_reason = f"empty page at page {page}"
break
if not isinstance(data, list):
stop_reason = f"unexpected response shape on page {page}: {type(data).__name__}"
console.print(f"[red]{stop_reason}[/]")
break
chunk = "".join(json.dumps(item) + "\n" for item in data)
await f.write(chunk)
await f.flush()
total_items += len(data)
console.print(f"[green]Got page {page} with {len(data)} items on it.")
page += 1
console.print()
console.print(f"[bold green]Done.[/] Got {total_items} items across {page} page(s).")
console.print(f"[dim]Stop reason: {stop_reason}[/]")
console.print(f"[dim]Output: {output}[/]")
@click.command()
@click.option(
"--query",
"-q",
default="port:11434",
show_default=True,
help="LeakIX search query (e.g. 'port:11434').",
)
@click.option(
"--scope",
"-s",
type=click.Choice(["service", "leak"]),
default="service",
show_default=True,
help="LeakIX search scope.",
)
@click.option(
"--output",
"-o",
default=None,
help="Output JSONL path. Defaults to results-<sanitized-query>.jsonl",
)
@click.option(
"--api-key",
default=LEAKIX_API_KEY,
envvar="LEAKIX_API_KEY",
help="LeakIX API key (or set LEAKIX_API_KEY env var).",
)
@click.option(
"--max-pages",
type=int,
default=None,
help="Optional cap on number of pages to fetch.",
)
def main(
query: str,
scope: str,
output: str | None,
api_key: str,
max_pages: int | None,
) -> None:
if output is None:
output = f"results-{sanitize_query(query)}.jsonl"
print("# Hello from leakix-search!\n")
asyncio.run(amain(query, scope, output, api_key, max_pages))
if __name__ == "__main__":
main()"""
Search the leakix.net database and save the results to a .jsonl file for
further processing.
Requires an API key, get it [here](https://leakix.net/settings/api).
Pass it on the command line with the --api-key option argument or put it
in a .env file, like this: `LEAKIX_API_KEY=<your-key>`. Their API is free
to use (with generous limits), but please don't abuse it, or you will
likely be banned, or worse.
- Vic <un1tz3r0@gmail.com> Jun 13, 2026
"""
import asyncio
import json
import re
import aiofiles
import aiohttp
import click
from aiopathlib import AsyncPath
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
)
LEAKIX_API_KEY = None
SEARCH_URL = "https://leakix.net/search"
def load_env(path: str = ".env") -> dict[str, str]:
'''
Load environment variables from a .env file.
Args:
path (str): Path to the .env file. Defaults to ".env".
Returns:
dict[str, str]: A dictionary containing the environment variables.
The .env file should have lines in the format KEY=VALUE. Lines starting
with # are treated as comments and ignored. Values can be optionally
enclosed in single or double quotes. If enclosed in double quotes, escape
sequences will be processed.
'''
env: dict[str, str] = {}
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("export "):
line = line[7:].lstrip()
key, sep, value = line.partition("=")
if not sep:
continue
key = key.strip()
if not key:
continue
value = value.strip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'):
quote = value[0]
value = value[1:-1]
if quote == '"':
value = value.encode("utf-8").decode("unicode_escape")
env[key] = value
return env
# load the api key from the .env file if not set via env var or command line
LEAKIX_API_KEY = load_env().get("LEAKIX_API_KEY") or LEAKIX_API_KEY
def sanitize_query(q: str) -> str:
slug = re.sub(r"[^A-Za-z0-9._-]+", "-", q).strip("-")
return slug or "query"
async def fetch_page(
session: aiohttp.ClientSession,
query: str,
page: int,
scope: str,
api_key: str,
console: Console,
) -> tuple[list | None, int, str | None]:
headers = {"api-key": api_key, "accept": "application/json"}
params = {"q": query, "scope": scope, "page": str(page)}
tries = 0
while True:
tries = tries + 1
console.print(
f"[cyan]Fetch URL {SEARCH_URL} with params {params}..."
)
async with session.get(SEARCH_URL, headers=headers, params=params) as resp:
if resp.status == 429:
retry_after_raw = resp.headers.get("Retry-After", "5")
try:
retry_after = max(1, int(float(retry_after_raw)))
except ValueError:
retry_after = 5
console.print(
f"[yellow]429 on page {page}; sleeping {retry_after}s per Retry-After[/]"
)
await asyncio.sleep(retry_after)
continue
if resp.status >= 400:
body = await resp.text()
console.print(
f"[green]{resp.status} on page {page}; got {len(body)} bytes of content"
)
return None, resp.status, body
data = await resp.json()
return data, resp.status, None
async def amain(
query: str, scope: str, output: str, api_key: str, max_pages: int | None
) -> None:
console = Console()
out_path = AsyncPath(output)
console.print(f"[bold cyan]Query:[/] {query}")
console.print(f"[bold cyan]Scope:[/] {scope}")
console.print(f"[bold cyan]Output:[/] {output} (append)")
if max_pages is not None:
console.print(f"[bold cyan]Pages:[/] cap at {max_pages}")
total_items = 0
page = 0
stop_reason = "unknown"
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with aiofiles.open(out_path, "a") as f:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
TimeElapsedColumn(),
console=console,
transient=False,
) as progress:
task = progress.add_task("starting...", total=None)
while True:
if max_pages is not None and page >= max_pages:
stop_reason = f"reached --max-pages={max_pages}"
break
progress.update(
task,
description=f"page {page} | items so far: {total_items}",
)
try:
data, status, err = await fetch_page(
session, query, page, scope, api_key, console
)
except aiohttp.ClientError as e:
stop_reason = f"network error on page {page}: {e!r}"
break
if err is not None:
snippet = (err or "").strip()[:500]
console.print(
f"[red]HTTP {status} on page {page}[/]\n[red]{snippet}[/]"
)
stop_reason = f"HTTP {status} on page {page}"
break
if not data:
stop_reason = f"empty page at page {page}"
break
if not isinstance(data, list):
stop_reason = f"unexpected response shape on page {page}: {type(data).__name__}"
console.print(f"[red]{stop_reason}[/]")
break
chunk = "".join(json.dumps(item) + "\n" for item in data)
await f.write(chunk)
await f.flush()
total_items += len(data)
console.print(f"[green]Got page {page} with {len(data)} items on it.")
page += 1
console.print()
console.print(f"[bold green]Done.[/] Got {total_items} items across {page} page(s).")
console.print(f"[dim]Stop reason: {stop_reason}[/]")
console.print(f"[dim]Output: {output}[/]")
@click.command()
@click.option(
"--query",
"-q",
default="port:11434",
show_default=True,
help="LeakIX search query (e.g. 'port:11434').",
)
@click.option(
"--scope",
"-s",
type=click.Choice(["service", "leak"]),
default="service",
show_default=True,
help="LeakIX search scope.",
)
@click.option(
"--output",
"-o",
default=None,
help="Output JSONL path. Defaults to results-<sanitized-query>.jsonl",
)
@click.option(
"--api-key",
default=LEAKIX_API_KEY,
envvar="LEAKIX_API_KEY",
help="LeakIX API key (or set LEAKIX_API_KEY env var).",
)
@click.option(
"--max-pages",
type=int,
default=None,
help="Optional cap on number of pages to fetch.",
)
def main(
query: str,
scope: str,
output: str | None,
api_key: str,
max_pages: int | None,
) -> None:
if output is None:
output = f"results-{sanitize_query(query)}.jsonl"
print("# Hello from leakix-search!\n")
asyncio.run(amain(query, scope, output, api_key, max_pages))
if __name__ == "__main__":
main()When we run this script (don't forget to specify an API key using --api-key), it will show a progress message saying that it is fetching page after page of search results from the leakix.net service search API. It should count up steadily, eventually fetching a total of 500 pages, with 20 results each, for a total of 10,000 results, which it saves as JSON objects, one per line, in a file named results-port-11434.jsonl. Each one looks something like this, pretty-printed for readability:
{
"event_type": "service",
"event_source": "HttpPlugin",
"event_pipeline": ["tcpid", "HttpPlugin"],
"event_fingerprint": "6d1f2e7caae99a60964647f5b908983930bafc8f97400b2397400b23dd52604a",
"ip": "4.250.xxx.xxx",
"host": "4.250.xxx.xxx",
"reverse": "",
"port": "11434",
"mac": "",
"vendor": "",
"transport": ["tcp", "http"],
"protocol": "http",
"http": {
"root": "/",
"url": "/",
"status": 200,
"length": 17,
"header": {
"content-length": "17"
},
"title": "",
"favicon_hash": ""
},
"summary":
"HTTP/1.1 200 OK\n"
"Content-Type: text/plain; charset=utf-8\r\n"
"Date: Mon, 18 May 2026 19:56:51 GMT\r\n"
"Content-Length: 17\r\n"
"Connection: close\r\n"
"\n\n"
"Ollama is running",
"time": "2026-05-18T19:56:51.479985914Z",
/* ... empty fields omitted for brevity ... */
"tags": ["rescan"],
"geoip": {
"continent_name": "Europe",
"region_iso_code": "GB-ENG",
"city_name": "London",
"country_iso_code": "GB",
"country_name": "United Kingdom",
"region_name": "England",
"location": {
"lat": 5x.xxxx,
"lon": -0.xxxx
}
},
"network": {
"organization_name": "Microsoft Corporation",
"asn": 8075,
"network": "4.224.0.0/11"
}
}{
"event_type": "service",
"event_source": "HttpPlugin",
"event_pipeline": ["tcpid", "HttpPlugin"],
"event_fingerprint": "6d1f2e7caae99a60964647f5b908983930bafc8f97400b2397400b23dd52604a",
"ip": "4.250.xxx.xxx",
"host": "4.250.xxx.xxx",
"reverse": "",
"port": "11434",
"mac": "",
"vendor": "",
"transport": ["tcp", "http"],
"protocol": "http",
"http": {
"root": "/",
"url": "/",
"status": 200,
"length": 17,
"header": {
"content-length": "17"
},
"title": "",
"favicon_hash": ""
},
"summary":
"HTTP/1.1 200 OK\n"
"Content-Type: text/plain; charset=utf-8\r\n"
"Date: Mon, 18 May 2026 19:56:51 GMT\r\n"
"Content-Length: 17\r\n"
"Connection: close\r\n"
"\n\n"
"Ollama is running",
"time": "2026-05-18T19:56:51.479985914Z",
/* ... empty fields omitted for brevity ... */
"tags": ["rescan"],
"geoip": {
"continent_name": "Europe",
"region_iso_code": "GB-ENG",
"city_name": "London",
"country_iso_code": "GB",
"country_name": "United Kingdom",
"region_name": "England",
"location": {
"lat": 5x.xxxx,
"lon": -0.xxxx
}
},
"network": {
"organization_name": "Microsoft Corporation",
"asn": 8075,
"network": "4.224.0.0/11"
}
}That's quite a few interesting-looking details… each search result has a wealth of information about a computer somewhere on the internet which had a service listening on port 11434 at the time that leakix.net last ran a port-scan against it. It even collects the HTTP server response, which in this case ends with "Ollama is running", a pretty good indication that we have indeed found what we are looking for. And as we can see, there are quite a few, and now we know their IP addresses.
Surveying The Servers
So now that we have our list of potentially unsecured Ollama servers IP addresses, of course we would like to know more about them. Which of them are still up and running? Whether or not they are really running Ollama or if there's some other service listening on the same obscure, high-numbered port?
Ollama clients have various subcommands that translate into different requests which are forwarded to the server, printing the server's reply to the client command's standard output before exiting. We want a command that doesn't make any changes on the server; just in case the server is in use by an authorized user, we should try to avoid doing anything that might disrupt whatever they are doing.
The list and ps subcommands (see ollama --help for a full list with brief descriptions of each) are fairly safe, as they both just report back information about the server, without changing anything . The first, list, will tell us about the models that are downloaded and cached, meaning they are available to load into memory and run. It returns a full list of all the model names that have been fetched and are stored on the server, along with some details about each one like the family, or architecture, that it is based on, what type of quantization is used to compress its weights (allowing larger models to run with a minimal loss of fidelity on GPUs with less on-board RAM) and the number of parameters (the effective size of the model, more generally means more capable, smarter) and how many tokens fit in its context window (which effectively limits the amount of conversation history which the model can reliably handle).
The ps subcommand tells us which of the available models is currently loaded into the GPU's on-board RAM, making it ready to process prompts with minimal latency. This is whatever model is currently in use, or was last used by whoever is operating the server.
So, we want to go down the list of servers that were found with our search, and for each one, try connecting as an Ollama client and then run these two read-only commands, tabulating whatever responses we get for each of them. This is definitely another job for a python script. Here is the one I wrote, available here and reproduced below:
"""
This script reads the output file written by
[this script](https://gist.github.com/un1tz3r0/8ebd1295aa3a9b85eb65ece1d226b3a8),
and then attempts to connect and query the models available on each host found
by the `leakix.net` service port search that it does. It writes the results to
a file `scan_results.jsonl` in the current directory for further processing.
**DISCLAIMER**
Depending on where you live, this may be illegal, or is at least frowned upon
by many. It is therefore my stern reccomendation that under no circumstances
should you or anyone else actually run this code or any variation or derivative
of it, not for any purpose whatsoever, without checking the laws which govern
your geographical region. Running it also will almost certainly constitute a
serious violation of the terms of service upon which your ISP conditionally
provides you internet access.
I, the author <un1tz3r0@gmail.com>, disclaim any responsibility and liability
for any damages or injury to property, persons or pride and reputation as a
direct or indirect result of the use of this code. This code is made available
for informative and entertainment purposes only and without any warranty or
guarantee of fitness for any purpose safe or free from back-doors, spyware and
rootkits that provide remote access to your network to the CIA. This code may
cause your toilet to keep filling unless you jiggle the handle.
- VMC <un1tz3r0@gmail.com>
June 8th, 2026
"""
import asyncio
import json
import time
from datetime import datetime, timezone
from pathlib import Path
import aiofiles
import aiohttp
import click
from rich.console import Group
from rich.live import Live
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.table import Table
class WorkerState:
__slots__ = ("wid", "host", "stage", "started", "last_result")
def __init__(self, wid: int) -> None:
self.wid = wid
self.host = ""
self.stage = "idle"
self.started: float | None = None
self.last_result = ""
def begin(self, host: str) -> None:
self.host = host
self.stage = "connect"
self.started = time.monotonic()
def to(self, stage: str) -> None:
self.stage = stage
def finish(self, summary: str) -> None:
self.stage = "idle"
self.host = ""
self.started = None
self.last_result = summary
async def call_endpoint(
session: aiohttp.ClientSession,
url: str,
timeout: aiohttp.ClientTimeout,
) -> tuple[object | None, int]:
t0 = time.monotonic()
result: object | None = None
try:
async with session.get(url, timeout=timeout) as r:
if r.status == 200:
try:
result = await r.json(content_type=None)
except (json.JSONDecodeError, aiohttp.ContentTypeError, UnicodeDecodeError):
result = None
except (aiohttp.ClientError, asyncio.TimeoutError, OSError):
result = None
elapsed_ms = int((time.monotonic() - t0) * 1000)
return result, elapsed_ms
async def post_endpoint(
session: aiohttp.ClientSession,
url: str,
payload: dict,
timeout: aiohttp.ClientTimeout,
) -> tuple[object | None, int]:
t0 = time.monotonic()
result: object | None = None
try:
async with session.post(url, json=payload, timeout=timeout) as r:
if r.status == 200:
try:
result = await r.json(content_type=None)
except (json.JSONDecodeError, aiohttp.ContentTypeError, UnicodeDecodeError):
result = None
except (aiohttp.ClientError, asyncio.TimeoutError, OSError):
result = None
elapsed_ms = int((time.monotonic() - t0) * 1000)
return result, elapsed_ms
def extract_model_names(result: object | None) -> list[str]:
if not isinstance(result, dict):
return []
models = result.get("models")
if not isinstance(models, list):
return []
names: list[str] = []
for m in models:
if not isinstance(m, dict):
continue
name = m.get("name") or m.get("model")
if isinstance(name, str) and name:
names.append(name)
return names
async def probe_host(
host: str,
port: int,
connect_timeout: float,
read_timeout: float,
state: WorkerState,
seen_models: dict,
models_lock: asyncio.Lock,
) -> dict:
base = f"http://{host}:{port}"
timeout = aiohttp.ClientTimeout(
total=None,
sock_connect=connect_timeout,
sock_read=read_timeout,
)
list_result: object | None = None
list_time: int | None = None
ps_result: object | None = None
ps_time: int | None = None
connector = aiohttp.TCPConnector(limit=2, force_close=True)
try:
async with aiohttp.ClientSession(connector=connector) as session:
state.to("list")
list_result, list_time = await call_endpoint(
session, f"{base}/api/tags", timeout
)
state.to("ps")
ps_result, ps_time = await call_endpoint(
session, f"{base}/api/ps", timeout
)
list_names = extract_model_names(list_result)
ps_names = extract_model_names(ps_result)
needs_show: list[str] = []
if list_names or ps_names:
async with models_lock:
for name in list_names:
entry = seen_models.get(name)
if entry is None:
entry = [set(), set(), None]
seen_models[name] = entry
entry[0].add(host)
if entry[2] is None and name not in needs_show:
needs_show.append(name)
for name in ps_names:
entry = seen_models.get(name)
if entry is None:
entry = [set(), set(), None]
seen_models[name] = entry
entry[1].add(host)
if entry[2] is None and name not in needs_show:
needs_show.append(name)
if needs_show:
state.to("show")
for name in needs_show:
info, _ = await post_endpoint(
session, f"{base}/api/show", {"model": name}, timeout
)
if not isinstance(info, dict):
continue
async with models_lock:
entry = seen_models.get(name)
if entry is not None and entry[2] is None:
entry[2] = info
except Exception:
pass
return {
"host": host,
"list_result": list_result,
"list_time": list_time,
"ps_result": ps_result,
"ps_time": ps_time,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
async def worker(
state: WorkerState,
queue: "asyncio.Queue[str | None]",
write_q: "asyncio.Queue[dict | None]",
counters: dict,
port: int,
connect_timeout: float,
read_timeout: float,
seen_models: dict,
models_lock: asyncio.Lock,
) -> None:
while True:
host = await queue.get()
if host is None:
queue.task_done()
return
state.begin(host)
try:
result = await probe_host(
host, port, connect_timeout, read_timeout, state,
seen_models, models_lock,
)
if result["list_result"] is not None or result["ps_result"] is not None:
counters["ok"] += 1
state.finish(f"ok {host}")
else:
counters["fail"] += 1
state.finish(f"fail {host}")
except Exception as e:
counters["fail"] += 1
state.finish(f"err {host}: {type(e).__name__}")
result = {
"host": host,
"list_result": None,
"list_time": None,
"ps_result": None,
"ps_time": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
await write_q.put(result)
counters["done"] += 1
queue.task_done()
async def writer_task(
write_q: "asyncio.Queue[dict | None]", path: Path
) -> None:
async with aiofiles.open(path, "a") as f:
while True:
item = await write_q.get()
if item is None:
write_q.task_done()
return
await f.write(json.dumps(item, ensure_ascii=False) + "\n")
await f.flush()
write_q.task_done()
async def dump_models(
seen_models: dict,
models_lock: asyncio.Lock,
path: Path,
backup: Path,
) -> None:
async with models_lock:
snapshot = [
(name, sorted(entry[0]), sorted(entry[1]), entry[2])
for name, entry in seen_models.items()
]
if not snapshot:
return
lines = [
json.dumps(
[name, [list_servers, ps_servers, info]],
ensure_ascii=False,
)
for name, list_servers, ps_servers, info in snapshot
]
if backup.exists():
backup.unlink()
if path.exists():
path.rename(backup)
async with aiofiles.open(path, "w") as f:
await f.write("\n".join(lines) + "\n")
await f.flush()
if backup.exists():
backup.unlink()
async def models_dumper(
seen_models: dict,
models_lock: asyncio.Lock,
path: Path,
interval: float,
stop_event: asyncio.Event,
) -> None:
backup = Path(str(path) + ".bak")
next_dump = time.monotonic() + interval
while not stop_event.is_set():
wait = max(0.0, next_dump - time.monotonic())
try:
await asyncio.wait_for(stop_event.wait(), timeout=wait)
except asyncio.TimeoutError:
pass
if stop_event.is_set():
break
await dump_models(seen_models, models_lock, path, backup)
next_dump = time.monotonic() + interval
await dump_models(seen_models, models_lock, path, backup)
def render(
progress: Progress,
workers: list[WorkerState],
counters: dict,
) -> Group:
table = Table(expand=True, show_edge=False, pad_edge=False)
table.add_column("W", justify="right", style="cyan", no_wrap=True, width=4)
table.add_column("Host", no_wrap=True, width=22)
table.add_column("Stage", no_wrap=True, width=8)
table.add_column("Elapsed", justify="right", no_wrap=True, width=8)
table.add_column("Last", overflow="ellipsis", no_wrap=True)
now = time.monotonic()
stage_style = {
"idle": "dim",
"connect": "yellow",
"list": "blue",
"ps": "magenta",
"show": "cyan",
}
for w in workers:
elapsed = f"{(now - w.started):.2f}s" if w.started else "-"
color = stage_style.get(w.stage, "")
stage_cell = f"[{color}]{w.stage}[/]" if color else w.stage
last_color = ""
if w.last_result.startswith("ok"):
last_color = "green"
elif w.last_result.startswith(("fail", "err")):
last_color = "red"
last_cell = (
f"[{last_color}]{w.last_result}[/]" if last_color else w.last_result
)
table.add_row(str(w.wid), w.host or "-", stage_cell, elapsed, last_cell)
summary = (
f"[green]ok:[/] {counters['ok']} "
f"[red]fail:[/] {counters['fail']} "
f"[cyan]done:[/] {counters['done']}/{counters['total']}"
)
return Group(progress, summary, table)
async def refresher(
live: Live, interval: float, render_fn
) -> None:
try:
while True:
live.update(render_fn(), refresh=True)
await asyncio.sleep(interval)
except asyncio.CancelledError:
pass
async def amain(
input_path: Path,
output_path: Path,
models_output_path: Path,
concurrency: int,
connect_timeout: float,
read_timeout: float,
port: int,
update_interval: float,
dump_interval: float,
) -> None:
hosts: list[str] = []
seen: set[str] = set()
for line in input_path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
click.echo(f"skipping non-JSON line: {line[:80]}", err=True)
continue
ip = obj.get("ip") if isinstance(obj, dict) else None
if not ip or ip in seen:
continue
seen.add(ip)
hosts.append(ip)
if not hosts:
click.echo("no hosts to scan", err=True)
return
queue: asyncio.Queue[str | None] = asyncio.Queue()
for h in hosts:
queue.put_nowait(h)
for _ in range(concurrency):
queue.put_nowait(None)
write_q: asyncio.Queue[dict | None] = asyncio.Queue()
counters = {"ok": 0, "fail": 0, "done": 0, "total": len(hosts)}
workers = [WorkerState(i) for i in range(concurrency)]
seen_models: dict = {}
models_lock = asyncio.Lock()
stop_event = asyncio.Event()
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TextColumn("•"),
TimeElapsedColumn(),
TextColumn("eta"),
TimeRemainingColumn(),
expand=True,
)
task_id = progress.add_task("scanning", total=len(hosts))
def render_fn() -> Group:
progress.update(task_id, completed=counters["done"])
return render(progress, workers, counters)
with Live(render_fn(), auto_refresh=False) as live:
ref = asyncio.create_task(refresher(live, update_interval, render_fn))
writer = asyncio.create_task(writer_task(write_q, output_path))
dumper = asyncio.create_task(
models_dumper(
seen_models, models_lock, models_output_path,
dump_interval, stop_event,
)
)
worker_tasks = [
asyncio.create_task(
worker(
workers[i],
queue,
write_q,
counters,
port,
connect_timeout,
read_timeout,
seen_models,
models_lock,
)
)
for i in range(concurrency)
]
try:
await asyncio.gather(*worker_tasks)
finally:
stop_event.set()
await dumper
await write_q.put(None)
await writer
ref.cancel()
try:
await ref
except asyncio.CancelledError:
pass
live.update(render_fn(), refresh=True)
click.echo(
f"\nDone. ok={counters['ok']} fail={counters['fail']} "
f"total={counters['total']} output={output_path} "
f"models={len(seen_models)} -> {models_output_path}"
)
@click.command()
@click.option(
"--input",
"-i",
"input_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=Path("results-port-11434.jsonl"),
show_default=True,
help="Input JSONL file (from 1_search.py); the 'ip' field is extracted from each line.",
)
@click.option(
"--output",
"-o",
"output_file",
type=click.Path(dir_okay=False, path_type=Path),
default=Path("connect-results.jsonl"),
show_default=True,
help="Output JSONL file (appended).",
)
@click.option(
"--concurrency",
"-c",
type=int,
default=50,
show_default=True,
help="Number of concurrent worker tasks.",
)
@click.option(
"--connect-timeout",
type=float,
default=5.0,
show_default=True,
help="Per-request TCP connect timeout in seconds.",
)
@click.option(
"--read-timeout",
type=float,
default=10.0,
show_default=True,
help="Per-request socket read timeout in seconds.",
)
@click.option(
"--port",
"-p",
type=int,
default=11434,
show_default=True,
help="TCP port to connect on.",
)
@click.option(
"--update-interval",
type=float,
default=0.5,
show_default=True,
help="Status display refresh interval in seconds.",
)
@click.option(
"--models-output",
"models_output_file",
type=click.Path(dir_okay=False, path_type=Path),
default=Path("scan-results.jsonl"),
show_default=True,
help="Per-model aggregation output (overwritten each dump).",
)
@click.option(
"--dump-interval",
type=float,
default=5.0,
show_default=True,
help="How often (seconds) to rewrite the models-output file.",
)
def main(
input_file: Path,
output_file: Path,
concurrency: int,
connect_timeout: float,
read_timeout: float,
port: int,
update_interval: float,
models_output_file: Path,
dump_interval: float,
) -> None:
asyncio.run(
amain(
input_file,
output_file,
models_output_file,
concurrency,
connect_timeout,
read_timeout,
port,
update_interval,
dump_interval,
)
)
if __name__ == "__main__":
main()"""
This script reads the output file written by
[this script](https://gist.github.com/un1tz3r0/8ebd1295aa3a9b85eb65ece1d226b3a8),
and then attempts to connect and query the models available on each host found
by the `leakix.net` service port search that it does. It writes the results to
a file `scan_results.jsonl` in the current directory for further processing.
**DISCLAIMER**
Depending on where you live, this may be illegal, or is at least frowned upon
by many. It is therefore my stern reccomendation that under no circumstances
should you or anyone else actually run this code or any variation or derivative
of it, not for any purpose whatsoever, without checking the laws which govern
your geographical region. Running it also will almost certainly constitute a
serious violation of the terms of service upon which your ISP conditionally
provides you internet access.
I, the author <un1tz3r0@gmail.com>, disclaim any responsibility and liability
for any damages or injury to property, persons or pride and reputation as a
direct or indirect result of the use of this code. This code is made available
for informative and entertainment purposes only and without any warranty or
guarantee of fitness for any purpose safe or free from back-doors, spyware and
rootkits that provide remote access to your network to the CIA. This code may
cause your toilet to keep filling unless you jiggle the handle.
- VMC <un1tz3r0@gmail.com>
June 8th, 2026
"""
import asyncio
import json
import time
from datetime import datetime, timezone
from pathlib import Path
import aiofiles
import aiohttp
import click
from rich.console import Group
from rich.live import Live
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.table import Table
class WorkerState:
__slots__ = ("wid", "host", "stage", "started", "last_result")
def __init__(self, wid: int) -> None:
self.wid = wid
self.host = ""
self.stage = "idle"
self.started: float | None = None
self.last_result = ""
def begin(self, host: str) -> None:
self.host = host
self.stage = "connect"
self.started = time.monotonic()
def to(self, stage: str) -> None:
self.stage = stage
def finish(self, summary: str) -> None:
self.stage = "idle"
self.host = ""
self.started = None
self.last_result = summary
async def call_endpoint(
session: aiohttp.ClientSession,
url: str,
timeout: aiohttp.ClientTimeout,
) -> tuple[object | None, int]:
t0 = time.monotonic()
result: object | None = None
try:
async with session.get(url, timeout=timeout) as r:
if r.status == 200:
try:
result = await r.json(content_type=None)
except (json.JSONDecodeError, aiohttp.ContentTypeError, UnicodeDecodeError):
result = None
except (aiohttp.ClientError, asyncio.TimeoutError, OSError):
result = None
elapsed_ms = int((time.monotonic() - t0) * 1000)
return result, elapsed_ms
async def post_endpoint(
session: aiohttp.ClientSession,
url: str,
payload: dict,
timeout: aiohttp.ClientTimeout,
) -> tuple[object | None, int]:
t0 = time.monotonic()
result: object | None = None
try:
async with session.post(url, json=payload, timeout=timeout) as r:
if r.status == 200:
try:
result = await r.json(content_type=None)
except (json.JSONDecodeError, aiohttp.ContentTypeError, UnicodeDecodeError):
result = None
except (aiohttp.ClientError, asyncio.TimeoutError, OSError):
result = None
elapsed_ms = int((time.monotonic() - t0) * 1000)
return result, elapsed_ms
def extract_model_names(result: object | None) -> list[str]:
if not isinstance(result, dict):
return []
models = result.get("models")
if not isinstance(models, list):
return []
names: list[str] = []
for m in models:
if not isinstance(m, dict):
continue
name = m.get("name") or m.get("model")
if isinstance(name, str) and name:
names.append(name)
return names
async def probe_host(
host: str,
port: int,
connect_timeout: float,
read_timeout: float,
state: WorkerState,
seen_models: dict,
models_lock: asyncio.Lock,
) -> dict:
base = f"http://{host}:{port}"
timeout = aiohttp.ClientTimeout(
total=None,
sock_connect=connect_timeout,
sock_read=read_timeout,
)
list_result: object | None = None
list_time: int | None = None
ps_result: object | None = None
ps_time: int | None = None
connector = aiohttp.TCPConnector(limit=2, force_close=True)
try:
async with aiohttp.ClientSession(connector=connector) as session:
state.to("list")
list_result, list_time = await call_endpoint(
session, f"{base}/api/tags", timeout
)
state.to("ps")
ps_result, ps_time = await call_endpoint(
session, f"{base}/api/ps", timeout
)
list_names = extract_model_names(list_result)
ps_names = extract_model_names(ps_result)
needs_show: list[str] = []
if list_names or ps_names:
async with models_lock:
for name in list_names:
entry = seen_models.get(name)
if entry is None:
entry = [set(), set(), None]
seen_models[name] = entry
entry[0].add(host)
if entry[2] is None and name not in needs_show:
needs_show.append(name)
for name in ps_names:
entry = seen_models.get(name)
if entry is None:
entry = [set(), set(), None]
seen_models[name] = entry
entry[1].add(host)
if entry[2] is None and name not in needs_show:
needs_show.append(name)
if needs_show:
state.to("show")
for name in needs_show:
info, _ = await post_endpoint(
session, f"{base}/api/show", {"model": name}, timeout
)
if not isinstance(info, dict):
continue
async with models_lock:
entry = seen_models.get(name)
if entry is not None and entry[2] is None:
entry[2] = info
except Exception:
pass
return {
"host": host,
"list_result": list_result,
"list_time": list_time,
"ps_result": ps_result,
"ps_time": ps_time,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
async def worker(
state: WorkerState,
queue: "asyncio.Queue[str | None]",
write_q: "asyncio.Queue[dict | None]",
counters: dict,
port: int,
connect_timeout: float,
read_timeout: float,
seen_models: dict,
models_lock: asyncio.Lock,
) -> None:
while True:
host = await queue.get()
if host is None:
queue.task_done()
return
state.begin(host)
try:
result = await probe_host(
host, port, connect_timeout, read_timeout, state,
seen_models, models_lock,
)
if result["list_result"] is not None or result["ps_result"] is not None:
counters["ok"] += 1
state.finish(f"ok {host}")
else:
counters["fail"] += 1
state.finish(f"fail {host}")
except Exception as e:
counters["fail"] += 1
state.finish(f"err {host}: {type(e).__name__}")
result = {
"host": host,
"list_result": None,
"list_time": None,
"ps_result": None,
"ps_time": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
await write_q.put(result)
counters["done"] += 1
queue.task_done()
async def writer_task(
write_q: "asyncio.Queue[dict | None]", path: Path
) -> None:
async with aiofiles.open(path, "a") as f:
while True:
item = await write_q.get()
if item is None:
write_q.task_done()
return
await f.write(json.dumps(item, ensure_ascii=False) + "\n")
await f.flush()
write_q.task_done()
async def dump_models(
seen_models: dict,
models_lock: asyncio.Lock,
path: Path,
backup: Path,
) -> None:
async with models_lock:
snapshot = [
(name, sorted(entry[0]), sorted(entry[1]), entry[2])
for name, entry in seen_models.items()
]
if not snapshot:
return
lines = [
json.dumps(
[name, [list_servers, ps_servers, info]],
ensure_ascii=False,
)
for name, list_servers, ps_servers, info in snapshot
]
if backup.exists():
backup.unlink()
if path.exists():
path.rename(backup)
async with aiofiles.open(path, "w") as f:
await f.write("\n".join(lines) + "\n")
await f.flush()
if backup.exists():
backup.unlink()
async def models_dumper(
seen_models: dict,
models_lock: asyncio.Lock,
path: Path,
interval: float,
stop_event: asyncio.Event,
) -> None:
backup = Path(str(path) + ".bak")
next_dump = time.monotonic() + interval
while not stop_event.is_set():
wait = max(0.0, next_dump - time.monotonic())
try:
await asyncio.wait_for(stop_event.wait(), timeout=wait)
except asyncio.TimeoutError:
pass
if stop_event.is_set():
break
await dump_models(seen_models, models_lock, path, backup)
next_dump = time.monotonic() + interval
await dump_models(seen_models, models_lock, path, backup)
def render(
progress: Progress,
workers: list[WorkerState],
counters: dict,
) -> Group:
table = Table(expand=True, show_edge=False, pad_edge=False)
table.add_column("W", justify="right", style="cyan", no_wrap=True, width=4)
table.add_column("Host", no_wrap=True, width=22)
table.add_column("Stage", no_wrap=True, width=8)
table.add_column("Elapsed", justify="right", no_wrap=True, width=8)
table.add_column("Last", overflow="ellipsis", no_wrap=True)
now = time.monotonic()
stage_style = {
"idle": "dim",
"connect": "yellow",
"list": "blue",
"ps": "magenta",
"show": "cyan",
}
for w in workers:
elapsed = f"{(now - w.started):.2f}s" if w.started else "-"
color = stage_style.get(w.stage, "")
stage_cell = f"[{color}]{w.stage}[/]" if color else w.stage
last_color = ""
if w.last_result.startswith("ok"):
last_color = "green"
elif w.last_result.startswith(("fail", "err")):
last_color = "red"
last_cell = (
f"[{last_color}]{w.last_result}[/]" if last_color else w.last_result
)
table.add_row(str(w.wid), w.host or "-", stage_cell, elapsed, last_cell)
summary = (
f"[green]ok:[/] {counters['ok']} "
f"[red]fail:[/] {counters['fail']} "
f"[cyan]done:[/] {counters['done']}/{counters['total']}"
)
return Group(progress, summary, table)
async def refresher(
live: Live, interval: float, render_fn
) -> None:
try:
while True:
live.update(render_fn(), refresh=True)
await asyncio.sleep(interval)
except asyncio.CancelledError:
pass
async def amain(
input_path: Path,
output_path: Path,
models_output_path: Path,
concurrency: int,
connect_timeout: float,
read_timeout: float,
port: int,
update_interval: float,
dump_interval: float,
) -> None:
hosts: list[str] = []
seen: set[str] = set()
for line in input_path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
click.echo(f"skipping non-JSON line: {line[:80]}", err=True)
continue
ip = obj.get("ip") if isinstance(obj, dict) else None
if not ip or ip in seen:
continue
seen.add(ip)
hosts.append(ip)
if not hosts:
click.echo("no hosts to scan", err=True)
return
queue: asyncio.Queue[str | None] = asyncio.Queue()
for h in hosts:
queue.put_nowait(h)
for _ in range(concurrency):
queue.put_nowait(None)
write_q: asyncio.Queue[dict | None] = asyncio.Queue()
counters = {"ok": 0, "fail": 0, "done": 0, "total": len(hosts)}
workers = [WorkerState(i) for i in range(concurrency)]
seen_models: dict = {}
models_lock = asyncio.Lock()
stop_event = asyncio.Event()
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TextColumn("•"),
TimeElapsedColumn(),
TextColumn("eta"),
TimeRemainingColumn(),
expand=True,
)
task_id = progress.add_task("scanning", total=len(hosts))
def render_fn() -> Group:
progress.update(task_id, completed=counters["done"])
return render(progress, workers, counters)
with Live(render_fn(), auto_refresh=False) as live:
ref = asyncio.create_task(refresher(live, update_interval, render_fn))
writer = asyncio.create_task(writer_task(write_q, output_path))
dumper = asyncio.create_task(
models_dumper(
seen_models, models_lock, models_output_path,
dump_interval, stop_event,
)
)
worker_tasks = [
asyncio.create_task(
worker(
workers[i],
queue,
write_q,
counters,
port,
connect_timeout,
read_timeout,
seen_models,
models_lock,
)
)
for i in range(concurrency)
]
try:
await asyncio.gather(*worker_tasks)
finally:
stop_event.set()
await dumper
await write_q.put(None)
await writer
ref.cancel()
try:
await ref
except asyncio.CancelledError:
pass
live.update(render_fn(), refresh=True)
click.echo(
f"\nDone. ok={counters['ok']} fail={counters['fail']} "
f"total={counters['total']} output={output_path} "
f"models={len(seen_models)} -> {models_output_path}"
)
@click.command()
@click.option(
"--input",
"-i",
"input_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=Path("results-port-11434.jsonl"),
show_default=True,
help="Input JSONL file (from 1_search.py); the 'ip' field is extracted from each line.",
)
@click.option(
"--output",
"-o",
"output_file",
type=click.Path(dir_okay=False, path_type=Path),
default=Path("connect-results.jsonl"),
show_default=True,
help="Output JSONL file (appended).",
)
@click.option(
"--concurrency",
"-c",
type=int,
default=50,
show_default=True,
help="Number of concurrent worker tasks.",
)
@click.option(
"--connect-timeout",
type=float,
default=5.0,
show_default=True,
help="Per-request TCP connect timeout in seconds.",
)
@click.option(
"--read-timeout",
type=float,
default=10.0,
show_default=True,
help="Per-request socket read timeout in seconds.",
)
@click.option(
"--port",
"-p",
type=int,
default=11434,
show_default=True,
help="TCP port to connect on.",
)
@click.option(
"--update-interval",
type=float,
default=0.5,
show_default=True,
help="Status display refresh interval in seconds.",
)
@click.option(
"--models-output",
"models_output_file",
type=click.Path(dir_okay=False, path_type=Path),
default=Path("scan-results.jsonl"),
show_default=True,
help="Per-model aggregation output (overwritten each dump).",
)
@click.option(
"--dump-interval",
type=float,
default=5.0,
show_default=True,
help="How often (seconds) to rewrite the models-output file.",
)
def main(
input_file: Path,
output_file: Path,
concurrency: int,
connect_timeout: float,
read_timeout: float,
port: int,
update_interval: float,
models_output_file: Path,
dump_interval: float,
) -> None:
asyncio.run(
amain(
input_file,
output_file,
models_output_file,
concurrency,
connect_timeout,
read_timeout,
port,
update_interval,
dump_interval,
)
)
if __name__ == "__main__":
main()The script above reads the results-port-11434.jsonl file created by the first script, and queues up each unique IP address returned by the search to be processed by the next available worker in a pool of concurrently running worker tasks, which take addresses, one at a time, and run both Ollama commands against them, tabulating the results and writing them to a pair of output files, connect-results.jsonl and scan-results.jsonl.
You may be asking yourself, "Why all the extra complexity? Why not just process them sequentially with a single, simple for loop?"
The answer is, of course, performance. The majority of the time it takes to run our commands and get the servers' responses is actually spent doing nothing, waiting to recieve an answer from the server. While we are waiting for one server's answer, we could just as easily be sending a request to another, so long as we keep track of which socket is which so that we know what IP address corresponds to each response, we can actually manage quite a few connections simultaneously like this at once without coming close to eating up our Internet connection's available bandwidth.
Parallel Pool Party
While our script is running, the workers activity and states are shown in the terminal as a tabulated list of all of the workers and which address each one is currently in the process of attempting to interrogate. The network operations performed by the workers are configured with timeouts so that a poor connection or a crashed or misbehaving server on the other end of the workers' connections will not occupy the worker that is waiting for it forever, to ensure that we never encounter a situation where all of the workers are stuck waiting forever for operations that cannot be completed. The script coordinates the workers and when it determines that all of the potential servers have been tried, it signals them to stop awaiting additional jobs and exit before the script itself ends.
The results of our Ollama client scan are collated and saved in a pair of files, scan_results.jsonl and connect_results.jsonl, that are intended to be read and processed offline by a pair of scripts, called figures.py and report.py which will tally the results and emit a handful of interesting statistics for us, rendered as charts and graphs and an interactive HTML report for browsing and sorting detailed tables of the results and statistics. They are available along with the first two scripts here: https://github.com/un1tz3r0/leakix-ollama-search.
What Did We Learn?
Here are a few interesting statistics which the above two scripts generate. The first is a plot of the most popular model families:
As we can see, Meta's Llama-based models are the most popular, with Qwen being second. Surprisingly, Google's capable Gemma models don't even make it into the top 5. Next up is model size:
Not surprisingly, the 3.2B-10B bracket is the most popular by far, which fits with Ollama's target audience being individuals running consumer graphics cards which have at most 24GB of VRAM, and many being limited to 16GB. Another statistic is quantization types:
We can see that Q4_K_M seems to be the preferred method, with 8 and 16 bit floating-point being next, followed by several other 4-bit schemes. That 5-bit quantization is barely in use at all is interesting, this is probably due to 4-bit performance being sufficient. Our final plot shows popularity by country:
Interesting, not what I expected to see at all.
Conclusions
This project puts a spotlight on some serious security issues with one of the most popular pieces of software used by machine learning researchers and enthusiasts. Hopefully it will inspire anyone who is running ML software like Ollama on managed hosting or internet-connected servers to give some thought to securing their operations in the future.
Full source code for the scripts presented in this article, and the scripts which generate the reports and plots is available on github: https://github.com/un1tz3r0/leakix-ollama-search