1
0
Fork 0
forked from lthn/LEM

Merge pull request 'Add generation worker for distributed training data pipeline' (#1) from Charon/LEM:feat/generation-worker into main

Reviewed-on: lthn/LEM#1
Reviewed-by: Snider <snider@noreply.forge.lthn.ai>
This commit is contained in:
Snider 2026-02-14 22:48:26 +00:00
commit d722ba1b3d
8 changed files with 63239 additions and 0 deletions

3
.gitignore vendored
View file

@ -2,3 +2,6 @@
.idea/ .idea/
__pycache__/ __pycache__/
*.pyc *.pyc
# Worker output (generated locally, not committed)
worker/output/

View file

@ -42,6 +42,7 @@ seeds/ # P01-P100 evaluation prompts
training/ # Training data (1,839 train, 229 valid, 231 test) training/ # Training data (1,839 train, 229 valid, 231 test)
scripts/ # Benchmark and scoring scripts scripts/ # Benchmark and scoring scripts
benchmarks/ # Standard benchmark data + results + scores benchmarks/ # Standard benchmark data + results + scores
worker/ # Generation worker (join the training data pipeline)
``` ```
## Reproduce ## Reproduce
@ -102,6 +103,73 @@ The ethical kernel is 9,189 characters built on 5 axioms:
The kernel is in `kernel/lek-1-kernel.txt`. The structured axioms are in `kernel/axioms.json`. The kernel is in `kernel/lek-1-kernel.txt`. The structured axioms are in `kernel/axioms.json`.
## Join the Generation Train
We're building a 87K+ training dataset across 22K domains and global regions. You can contribute compute from any Apple Silicon Mac.
### Quick Start
```bash
cd worker
bash setup.sh # install deps, check connectivity
```
### 1. Get your InfluxDB token
Workers coordinate via InfluxDB so no work is duplicated. Get a token from the team and save it:
```bash
echo 'YOUR_TOKEN_HERE' > ~/.influx_token
```
### 2. Gold Generation (finish the 15K golden set)
Uses axiom sandwich signing (system prompt + kernel postfix) on a base model:
```bash
cd worker
# Check what's left to do
python3 lem_generate.py --dry-run
# Start generating (default: gemma-3-12b, good for 16GB+ RAM)
python3 lem_generate.py --worker my-m1-gold
# For 8GB machines, use the 4B model
python3 lem_generate.py --worker my-m1-gold --model mlx-community/gemma-3-4b-it-qat-4bit
```
### 3. Expansion Generation (46K+ prompts, post-training)
Once LEM models are trained on the golden set, expansion uses the trained model directly (no sandwich):
```bash
cd worker
# Check status
python3 lem_expand.py --dry-run
# Start expanding
python3 lem_expand.py --worker my-m1-expand
# Or use an API backend (llama.cpp, Ollama, etc.)
python3 lem_expand.py --backend api --api-url http://localhost:8080/v1
```
### Model Recommendations by RAM
| RAM | Model | Flag |
|-----|-------|------|
| 8GB | Gemma 3 4B (QAT 4-bit) | `--model mlx-community/gemma-3-4b-it-qat-4bit` |
| 16GB | Gemma 3 12B (QAT 4-bit) | `--model mlx-community/gemma-3-12b-it-qat-4bit` (default) |
| 32GB+ | Gemma 3 27B (QAT 4-bit) | `--model mlx-community/gemma-3-27b-it-qat-4bit` |
### Network Requirements
Workers need access to InfluxDB at `10.69.69.165:8181` (lab network, VLAN 69). If you're remote, use VPN.
Output is saved locally to `worker/output/` and reported to InfluxDB. Ctrl+C to stop safely at any time — progress is tracked per-prompt, so you can resume where you left off.
## License ## License
EUPL-1.2 — European Union Public Licence. Compatible with Apache 2.0, GPL, MPL. EUPL-1.2 — European Union Public Licence. Compatible with Apache 2.0, GPL, MPL.

File diff suppressed because it is too large Load diff

16000
worker/data/gold-prompts.jsonl Normal file

File diff suppressed because it is too large Load diff

384
worker/lem_expand.py Executable file
View file

@ -0,0 +1,384 @@
#!/usr/bin/env python3
"""
LEM Expansion Generator InfluxDB coordinated worker
======================================================
Generates responses using trained LEM models (no sandwich signing needed).
The trained models have internalized the ethical framework via LoRA.
Multiple workers can run in parallel coordination via InfluxDB.
Backends:
- mlx: MLX on Apple Silicon (M1/M2/M3)
- api: OpenAI-compatible API (llama.cpp, vLLM, Ollama, etc.)
Usage:
python3 lem_expand.py # MLX, auto-detect
python3 lem_expand.py --backend api --api-url http://localhost:8090/v1
python3 lem_expand.py --worker m1-expand # named worker
python3 lem_expand.py --dry-run # show plan
python3 lem_expand.py --limit 100 # generate N then stop
"""
import argparse
import json
import os
import socket
import sys
import time
import urllib.request
import urllib.error
from pathlib import Path
# ── Paths (relative to this script) ─────────────────────────────────────
SCRIPT_DIR = Path(__file__).parent
DATA_DIR = SCRIPT_DIR / "data"
OUTPUT_DIR = SCRIPT_DIR / "output"
PROMPTS_PATH = DATA_DIR / "expansion-prompts.jsonl"
# ── Generation parameters ─────────────────────────────────────────────────
MAX_TOKENS = 512
TEMPERATURE = 0.3
# ── InfluxDB ──────────────────────────────────────────────────────────────
INFLUX_URL = os.environ.get("INFLUX_URL", "http://10.69.69.165:8181")
INFLUX_DB = os.environ.get("INFLUX_DB", "training")
INFLUX_TOKEN_PATH = Path.home() / ".influx_token"
REFRESH_EVERY = 25
def get_influx_token():
if tok := os.environ.get("INFLUX_TOKEN"):
return tok
if INFLUX_TOKEN_PATH.exists():
return INFLUX_TOKEN_PATH.read_text().strip()
print(f"Warning: no InfluxDB token found at {INFLUX_TOKEN_PATH} or INFLUX_TOKEN env")
return ""
def influx_query(token, sql):
body = json.dumps({"db": INFLUX_DB, "q": sql}).encode()
req = urllib.request.Request(
f"{INFLUX_URL}/api/v3/query_sql",
data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
except (urllib.error.URLError, OSError) as e:
print(f"InfluxDB query error: {e}")
return []
def influx_write(token, lines):
body = "\n".join(lines).encode()
req = urllib.request.Request(
f"{INFLUX_URL}/api/v3/write_lp?db={INFLUX_DB}",
data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "text/plain",
},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=10)
return True
except (urllib.error.URLError, OSError) as e:
print(f"InfluxDB write error: {e}")
return False
def _escape_lp(s):
return s.replace(" ", "\\ ").replace(",", "\\,").replace("=", "\\=")
def get_completed_indices(token):
rows = influx_query(token, "SELECT DISTINCT i FROM expansion_gen")
return {int(r["i"]) for r in rows if r.get("i") is not None}
def report_generation(token, worker, idx, seed, gen_time, response_chars, model_name):
domain = _escape_lp(seed.get("domain", "unknown"))
region = _escape_lp(seed.get("region", "unknown"))
safe_worker = _escape_lp(worker)
seed_id = seed.get("seed_id", f"EX_{idx:05d}").replace('"', '\\"')
safe_model = model_name.replace('"', '\\"')
line = (
f'expansion_gen,i={idx},w={safe_worker},d={domain},r={region} '
f'seed_id="{seed_id}",gen_time={gen_time:.1f},'
f'chars={response_chars}i,model="{safe_model}"'
)
return influx_write(token, [line])
def report_stats(token, worker, completed_count, target):
safe_worker = _escape_lp(worker)
pct = completed_count / target * 100 if target > 0 else 0
line = (
f"expansion_progress,worker={safe_worker} "
f"completed={completed_count}i,target={target}i,pct={pct:.1f}"
)
influx_write(token, [line])
def load_prompts(path):
prompts = []
with open(path) as f:
for line in f:
line = line.strip()
if line:
prompts.append(json.loads(line))
return prompts
# ── MLX Backend ──────────────────────────────────────────────────────────
def generate_mlx(model, tokenizer, sampler, prompt, max_tokens):
from mlx_lm import generate
messages = [{"role": "user", "content": prompt}]
text = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
t0 = time.time()
response = generate(
model, tokenizer, prompt=text, max_tokens=max_tokens, sampler=sampler
)
elapsed = time.time() - t0
return response, elapsed
# ── API Backend (OpenAI-compatible) ──────────────────────────────────────
def generate_api(api_url, api_model, prompt, max_tokens, temperature):
payload = {
"model": api_model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": temperature,
}
body = json.dumps(payload).encode()
req = urllib.request.Request(
f"{api_url}/chat/completions",
data=body,
headers={"Content-Type": "application/json"},
)
t0 = time.time()
with urllib.request.urlopen(req, timeout=120) as resp:
result = json.loads(resp.read())
elapsed = time.time() - t0
response = result["choices"][0]["message"]["content"]
return response, elapsed
def main():
parser = argparse.ArgumentParser(description="LEM Expansion Generator (InfluxDB coordinated)")
parser.add_argument("--worker", default=None, help="Worker ID (default: hostname-pid)")
parser.add_argument("--influx", default=None, help="InfluxDB URL")
parser.add_argument("--prompts", default=None, help="JSONL prompts file")
parser.add_argument("--output", default=None, help="JSONL output path (default: auto)")
parser.add_argument("--limit", type=int, default=0, help="Max generations (0=unlimited)")
parser.add_argument("--dry-run", action="store_true", help="Show plan without generating")
# Backend selection
parser.add_argument("--backend", default="mlx", choices=["mlx", "api"],
help="Generation backend (default: mlx)")
# MLX options
parser.add_argument("--model", default="mlx-community/gemma-3-12b-it-qat-4bit",
help="MLX model ID (for mlx backend)")
# API options
parser.add_argument("--api-url", default="http://localhost:8090/v1",
help="OpenAI-compatible API URL (for api backend)")
parser.add_argument("--api-model", default="default",
help="Model name for API backend")
# Generation parameters
parser.add_argument("--max-tokens", type=int, default=MAX_TOKENS)
parser.add_argument("--temperature", type=float, default=TEMPERATURE)
args = parser.parse_args()
global INFLUX_URL
if args.influx:
INFLUX_URL = args.influx
worker = args.worker or f"{socket.gethostname()}-{os.getpid()}"
prompts_path = Path(args.prompts) if args.prompts else PROMPTS_PATH
# ── Load token and check connectivity ─────────────────────────
token = get_influx_token()
if not token:
print("Error: no InfluxDB token available")
print("Place your token in ~/.influx_token or set INFLUX_TOKEN env var")
sys.exit(1)
test = influx_query(token, "SELECT 1 AS ok")
if not test:
print(f"Error: cannot reach InfluxDB at {INFLUX_URL}")
sys.exit(1)
print(f"InfluxDB connected: {INFLUX_URL}")
# ── Load prompts ──────────────────────────────────────────────
if not prompts_path.exists():
print(f"Error: prompts not found at {prompts_path}")
sys.exit(1)
prompts = load_prompts(prompts_path)
target = len(prompts)
print(f"Loaded {target} expansion prompts")
idx_map = {p["idx"]: p for p in prompts}
# ── Query completed from InfluxDB ─────────────────────────────
completed = get_completed_indices(token)
remaining = [p["idx"] for p in prompts if p["idx"] not in completed]
print(f"Completed: {len(completed)} | Remaining: {len(remaining)}")
if not remaining:
print("All expansion prompts already completed!")
return
if args.dry_run:
print(f"\n[DRY RUN] Would process {len(remaining)} prompts")
print(f" First 10 indices: {remaining[:10]}")
print(f" Worker: {worker}")
print(f" Backend: {args.backend}")
if args.backend == "mlx":
print(f" Model: {args.model}")
else:
print(f" API: {args.api_url} (model: {args.api_model})")
return
# ── Setup output ──────────────────────────────────────────────
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
output_path = Path(args.output) if args.output else OUTPUT_DIR / f"expand-{worker}.jsonl"
print(f"Output: {output_path}")
# ── Load backend ──────────────────────────────────────────────
mlx_model = mlx_tokenizer = mlx_sampler = None
model_name = ""
if args.backend == "mlx":
print(f"Loading MLX model: {args.model}")
from mlx_lm import load
from mlx_lm.sample_utils import make_sampler
mlx_model, mlx_tokenizer = load(args.model)
mlx_sampler = make_sampler(temp=args.temperature)
model_name = args.model.split("/")[-1] if "/" in args.model else args.model
print("Model loaded.")
else:
model_name = args.api_model
print(f"Using API backend: {args.api_url} (model: {model_name})")
# ── Generation loop ───────────────────────────────────────────
print(f"\nStarting expansion as worker '{worker}'")
print(f"{'='*60}")
batch_start = time.time()
generated = 0
errors = 0
limit = args.limit if args.limit > 0 else len(remaining)
for idx in remaining:
if generated >= limit:
break
seed = idx_map[idx]
try:
if args.backend == "mlx":
response, elapsed = generate_mlx(
mlx_model, mlx_tokenizer, mlx_sampler,
seed["prompt"], args.max_tokens
)
else:
response, elapsed = generate_api(
args.api_url, args.api_model,
seed["prompt"], args.max_tokens, args.temperature
)
result = {
"idx": idx,
"seed_id": seed.get("seed_id", f"EX_{idx:05d}"),
"region": seed.get("region", "unknown"),
"domain": seed.get("domain", "unknown"),
"prompt": seed["prompt"],
"response": response,
"gen_time": round(elapsed, 1),
"model": model_name,
"worker": worker,
}
with open(output_path, "a") as f:
f.write(json.dumps(result) + "\n")
report_generation(token, worker, idx, seed, elapsed, len(response), model_name)
generated += 1
completed.add(idx)
if generated % 10 == 0 or generated <= 5:
elapsed_total = time.time() - batch_start
rate = generated / elapsed_total if elapsed_total > 0 else 0
eta = (len(remaining) - generated) / rate if rate > 0 else 0
total_done = len(completed)
pct = total_done / target * 100
print(
f"[{total_done}/{target} {pct:.1f}%] idx={idx} "
f"| {len(response)} chars | {elapsed:.1f}s "
f"| {rate*3600:.0f}/hr | ETA: {eta/3600:.1f}h"
)
if generated % REFRESH_EVERY == 0:
new_completed = get_completed_indices(token)
new_from_others = new_completed - completed
if new_from_others:
print(f" >> {len(new_from_others)} new completions from other workers")
completed = new_completed
report_stats(token, worker, len(completed), target)
except KeyboardInterrupt:
print("\nInterrupted by user")
break
except Exception as e:
errors += 1
print(f"[ERROR] idx={idx}: {e}")
if errors > 50:
print("Too many errors, stopping.")
break
# ── Final report ──────────────────────────────────────────────
elapsed_total = time.time() - batch_start
report_stats(token, worker, len(completed), target)
print(f"\n{'='*60}")
print(f"Worker: {worker}")
print(f"Backend: {args.backend} ({model_name})")
print(f"Generated: {generated}")
print(f"Errors: {errors}")
print(f"Total: {len(completed)}/{target} ({len(completed)/target*100:.1f}%)")
if elapsed_total > 0:
print(f"Rate: {generated/elapsed_total*3600:.0f}/hr")
print(f"Time: {elapsed_total/3600:.1f}h")
print(f"Output: {output_path}")
if __name__ == "__main__":
main()

348
worker/lem_generate.py Executable file
View file

@ -0,0 +1,348 @@
#!/usr/bin/env python3
"""
LEM Gold Standard Generator InfluxDB coordinated worker
==========================================================
Generates gold standard responses using axiom sandwich signing.
Multiple workers can run in parallel coordination via InfluxDB.
Each worker:
1. Queries InfluxDB for completed indices
2. Picks the next uncompleted index
3. Generates the response (MLX on Apple Silicon)
4. Writes result to InfluxDB + local JSONL backup
5. Refreshes completed set periodically
Usage:
python3 lem_generate.py # auto-detect everything
python3 lem_generate.py --worker m1-gpu0 # named worker
python3 lem_generate.py --model mlx-community/gemma-3-4b-it-qat-4bit # smaller model
python3 lem_generate.py --dry-run # show what would be generated
python3 lem_generate.py --limit 100 # generate N then stop
"""
import argparse
import json
import os
import socket
import sys
import time
import urllib.request
import urllib.error
from pathlib import Path
# ── Paths (relative to this script) ─────────────────────────────────────
SCRIPT_DIR = Path(__file__).parent
DATA_DIR = SCRIPT_DIR / "data"
OUTPUT_DIR = SCRIPT_DIR / "output"
KERNEL_DIR = SCRIPT_DIR.parent / "kernel"
PROMPTS_PATH = DATA_DIR / "gold-prompts.jsonl"
AXIOMS_PATH = KERNEL_DIR / "axioms.json"
KERNEL_PATH = KERNEL_DIR / "lek-1-kernel.txt"
# ── Generation parameters ─────────────────────────────────────────────────
MAX_PROMPTS = 15000
MAX_TOKENS = 512
TEMPERATURE = 0.3
# ── InfluxDB ──────────────────────────────────────────────────────────────
INFLUX_URL = os.environ.get("INFLUX_URL", "http://10.69.69.165:8181")
INFLUX_DB = os.environ.get("INFLUX_DB", "training")
INFLUX_TOKEN_PATH = Path.home() / ".influx_token"
REFRESH_EVERY = 25
def get_influx_token():
if tok := os.environ.get("INFLUX_TOKEN"):
return tok
if INFLUX_TOKEN_PATH.exists():
return INFLUX_TOKEN_PATH.read_text().strip()
print(f"Warning: no InfluxDB token found at {INFLUX_TOKEN_PATH} or INFLUX_TOKEN env")
return ""
def influx_query(token, sql):
body = json.dumps({"db": INFLUX_DB, "q": sql}).encode()
req = urllib.request.Request(
f"{INFLUX_URL}/api/v3/query_sql",
data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
except (urllib.error.URLError, OSError) as e:
print(f"InfluxDB query error: {e}")
return []
def influx_write(token, lines):
body = "\n".join(lines).encode()
req = urllib.request.Request(
f"{INFLUX_URL}/api/v3/write_lp?db={INFLUX_DB}",
data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "text/plain",
},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=10)
return True
except (urllib.error.URLError, OSError) as e:
print(f"InfluxDB write error: {e}")
return False
def _escape_lp(s):
return s.replace(" ", "\\ ").replace(",", "\\,").replace("=", "\\=")
def get_completed_indices(token):
rows = influx_query(token, "SELECT DISTINCT i FROM gold_gen")
return {int(r["i"]) for r in rows if r.get("i") is not None}
def report_generation(token, worker, idx, seed, gen_time, response_chars):
domain = _escape_lp(seed.get("domain", "unknown"))
voice = _escape_lp(seed.get("voice", "unknown"))
safe_worker = _escape_lp(worker)
seed_id = seed.get("seed_id", f"P_{idx:05d}").replace('"', '\\"')
line = (
f'gold_gen,i={idx},w={safe_worker},d={domain},v={voice} '
f'seed_id="{seed_id}",gen_time={gen_time:.1f},'
f'chars={response_chars}i'
)
return influx_write(token, [line])
def report_stats(token, worker, completed_count, target):
safe_worker = _escape_lp(worker)
pct = completed_count / target * 100 if target > 0 else 0
line = (
f"golden_gen_progress,worker={safe_worker} "
f"completed={completed_count}i,target={target}i,pct={pct:.1f}"
)
influx_write(token, [line])
def load_prompts():
prompts = []
with open(PROMPTS_PATH) as f:
for line in f:
line = line.strip()
if line:
prompts.append(json.loads(line))
return prompts
def load_axiom_context():
with open(AXIOMS_PATH) as f:
axioms = json.load(f)
system_text = "You are guided by the following axioms of conscious interaction:\n\n"
for ax in axioms["axioms"]:
system_text += f"Axiom {ax['id']} ({ax['name']}): {ax['statement']}\n\n"
with open(KERNEL_PATH) as f:
kernel_text = f.read().strip()
return system_text, kernel_text
def generate_response(model, tokenizer, sampler, system_text, kernel_text, prompt):
from mlx_lm import generate
user_content = (
f"{prompt}\n\n---\n\n"
f"Consider this ethical framework in your response:\n{kernel_text}"
)
messages = [
{"role": "system", "content": system_text},
{"role": "user", "content": user_content},
]
text = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
t0 = time.time()
response = generate(
model, tokenizer, prompt=text, max_tokens=MAX_TOKENS, sampler=sampler
)
elapsed = time.time() - t0
return response, elapsed
def main():
parser = argparse.ArgumentParser(description="LEM Gold Generator (InfluxDB coordinated)")
parser.add_argument("--worker", default=None, help="Worker ID (default: hostname-pid)")
parser.add_argument("--influx", default=None, help="InfluxDB URL")
parser.add_argument("--model", default="mlx-community/gemma-3-12b-it-qat-4bit",
help="MLX model ID")
parser.add_argument("--limit", type=int, default=0, help="Max generations (0=unlimited)")
parser.add_argument("--dry-run", action="store_true", help="Show plan without generating")
parser.add_argument("--output", default=None, help="JSONL output path (default: auto)")
args = parser.parse_args()
global INFLUX_URL
if args.influx:
INFLUX_URL = args.influx
worker = args.worker or f"{socket.gethostname()}-{os.getpid()}"
# ── Validate paths ─────────────────────────────────────────────
for path, desc in [(PROMPTS_PATH, "prompts"), (AXIOMS_PATH, "axioms"), (KERNEL_PATH, "kernel")]:
if not path.exists():
print(f"Error: {desc} not found at {path}")
sys.exit(1)
# ── Load token and check connectivity ─────────────────────────
token = get_influx_token()
if not token:
print("Error: no InfluxDB token available")
print("Place your token in ~/.influx_token or set INFLUX_TOKEN env var")
sys.exit(1)
test = influx_query(token, "SELECT 1 AS ok")
if not test:
print(f"Error: cannot reach InfluxDB at {INFLUX_URL}")
sys.exit(1)
print(f"InfluxDB connected: {INFLUX_URL}")
# ── Load prompts ──────────────────────────────────────────────
prompts = load_prompts()
target = min(MAX_PROMPTS, len(prompts))
print(f"Loaded {len(prompts)} prompts, targeting {target}")
# ── Query completed from InfluxDB ─────────────────────────────
completed = get_completed_indices(token)
remaining = [i for i in range(target) if i not in completed]
print(f"Completed: {len(completed)} | Remaining: {len(remaining)}")
if not remaining:
print("All target prompts already completed!")
return
if args.dry_run:
print(f"\n[DRY RUN] Would process {len(remaining)} prompts")
print(f" First 10: {remaining[:10]}")
print(f" Worker: {worker}")
print(f" Model: {args.model}")
return
# ── Setup output ──────────────────────────────────────────────
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
output_path = Path(args.output) if args.output else OUTPUT_DIR / f"gold-{worker}.jsonl"
print(f"Output: {output_path}")
# ── Load model ────────────────────────────────────────────────
print(f"Loading model: {args.model}")
from mlx_lm import load
from mlx_lm.sample_utils import make_sampler
model, tokenizer = load(args.model)
sampler = make_sampler(temp=TEMPERATURE)
print("Model loaded.")
# ── Load axiom context ────────────────────────────────────────
system_text, kernel_text = load_axiom_context()
print(f"Axiom context: {len(system_text)} + {len(kernel_text)} chars")
# ── Generation loop ───────────────────────────────────────────
print(f"\nStarting generation as worker '{worker}'")
print(f"{'='*60}")
batch_start = time.time()
generated = 0
errors = 0
limit = args.limit if args.limit > 0 else len(remaining)
for idx in remaining:
if generated >= limit:
break
seed = prompts[idx]
try:
response, elapsed = generate_response(
model, tokenizer, sampler, system_text, kernel_text, seed["prompt"]
)
result = {
"idx": idx,
"seed_id": seed.get("seed_id", f"P_{idx:05d}"),
"domain": seed.get("domain", "unknown"),
"voice": seed.get("voice", "unknown"),
"prompt": seed["prompt"],
"response": response,
"gen_time": round(elapsed, 1),
"worker": worker,
}
with open(output_path, "a") as f:
f.write(json.dumps(result) + "\n")
report_generation(token, worker, idx, seed, elapsed, len(response))
generated += 1
completed.add(idx)
if generated % 10 == 0 or generated <= 5:
elapsed_total = time.time() - batch_start
rate = generated / elapsed_total if elapsed_total > 0 else 0
eta = (len(remaining) - generated) / rate if rate > 0 else 0
total_done = len(completed)
pct = total_done / target * 100
print(
f"[{total_done}/{target} {pct:.1f}%] idx={idx} "
f"| {len(response)} chars | {elapsed:.1f}s "
f"| {rate*3600:.0f}/hr | ETA: {eta/3600:.1f}h"
)
if generated % REFRESH_EVERY == 0:
new_completed = get_completed_indices(token)
new_from_others = new_completed - completed
if new_from_others:
print(f" >> {len(new_from_others)} new completions from other workers")
completed = new_completed
report_stats(token, worker, len(completed), target)
except KeyboardInterrupt:
print("\nInterrupted by user")
break
except Exception as e:
errors += 1
print(f"[ERROR] idx={idx}: {e}")
if errors > 50:
print("Too many errors, stopping.")
break
# ── Final report ──────────────────────────────────────────────
elapsed_total = time.time() - batch_start
report_stats(token, worker, len(completed), target)
print(f"\n{'='*60}")
print(f"Worker: {worker}")
print(f"Generated: {generated}")
print(f"Errors: {errors}")
print(f"Total: {len(completed)}/{target} ({len(completed)/target*100:.1f}%)")
if elapsed_total > 0:
print(f"Rate: {generated/elapsed_total*3600:.0f}/hr")
print(f"Time: {elapsed_total/3600:.1f}h")
print(f"Output: {output_path}")
if __name__ == "__main__":
main()

2
worker/requirements.txt Normal file
View file

@ -0,0 +1,2 @@
mlx>=0.22.0
mlx-lm>=0.22.1

103
worker/setup.sh Executable file
View file

@ -0,0 +1,103 @@
#!/bin/bash
set -e
echo "=== LEM Worker Setup ==="
echo ""
# Check platform
if [[ "$(uname -s)" != "Darwin" ]] || [[ "$(uname -m)" != "arm64" ]]; then
echo "Warning: MLX requires Apple Silicon (M1/M2/M3/M4)."
echo "For non-Apple hardware, use the --backend api option with llama.cpp or Ollama."
echo ""
fi
# Check Python
if ! command -v python3 &>/dev/null; then
echo "Error: python3 not found. Install Python 3.9+."
exit 1
fi
PYVER=$(python3 -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
echo "Python: $PYVER"
# Install dependencies
echo ""
echo "Installing Python dependencies..."
pip3 install -r requirements.txt
# Check InfluxDB token
echo ""
if [ -f "$HOME/.influx_token" ]; then
echo "InfluxDB token: found at ~/.influx_token"
elif [ -n "$INFLUX_TOKEN" ]; then
echo "InfluxDB token: found in INFLUX_TOKEN env"
else
echo "InfluxDB token: NOT FOUND"
echo ""
echo " You need an InfluxDB token to coordinate with other workers."
echo " Get it from the team and save it:"
echo ""
echo " echo 'YOUR_TOKEN_HERE' > ~/.influx_token"
echo ""
fi
# Check InfluxDB connectivity
echo ""
INFLUX_URL="${INFLUX_URL:-http://10.69.69.165:8181}"
echo -n "InfluxDB ($INFLUX_URL): "
if python3 -c "
import urllib.request, json, os
from pathlib import Path
token = os.environ.get('INFLUX_TOKEN', '')
if not token:
tp = Path.home() / '.influx_token'
if tp.exists(): token = tp.read_text().strip()
if not token:
print('SKIP (no token)')
exit(0)
body = json.dumps({'db': 'training', 'q': 'SELECT 1 AS ok'}).encode()
req = urllib.request.Request(
f'{os.environ.get(\"INFLUX_URL\", \"http://10.69.69.165:8181\")}/api/v3/query_sql',
data=body, headers={'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=5)
print('OK')
" 2>/dev/null; then
:
else
echo "UNREACHABLE"
echo " Make sure you're on the lab network (VLAN 69) or have VPN access."
fi
# Check data files
echo ""
echo "Data files:"
for f in data/gold-prompts.jsonl data/expansion-prompts.jsonl; do
if [ -f "$f" ]; then
lines=$(wc -l < "$f")
size=$(du -h "$f" | cut -f1)
echo " $f: $lines prompts ($size)"
else
echo " $f: NOT FOUND"
fi
done
# Summary
echo ""
echo "=== Setup Complete ==="
echo ""
echo "Quick start:"
echo ""
echo " # Gold generation (finish the 15K golden set):"
echo " python3 lem_generate.py --worker $(hostname)-gold --dry-run"
echo " python3 lem_generate.py --worker $(hostname)-gold"
echo ""
echo " # Expansion generation (46K+ prompts, needs trained LEM model):"
echo " python3 lem_expand.py --worker $(hostname)-expand --dry-run"
echo " python3 lem_expand.py --worker $(hostname)-expand"
echo ""
echo " # Use a smaller model for limited RAM:"
echo " python3 lem_generate.py --model mlx-community/gemma-3-4b-it-qat-4bit"
echo ""
echo " # Use API backend (llama.cpp, Ollama, etc.):"
echo " python3 lem_expand.py --backend api --api-url http://localhost:8080/v1"
echo ""