In the previous chapter, I have shown a Toolkit component. The Toolkit contains definitions for function tools for the LLM API. But it does not explicitly perform a tool call (that will be done in later chapters).
In this chapter, I show the evaluator component. The evaluator is a program to which you can send program code. A code interpreter. But the focus here is an evaluator that the LLM can interact with. The LLM sends a function tool call to interact with the evaluator.
Here are my two goals for this chapter. The evaluator must:
- manage a separate process for a Python interpreter,
- provide a method to send code to the interpreter and return as a string the output of the stdout and stderr.
The evaluator is a complex topic. Perhaps it is best to subdivide the problem. I thought about it for some time, and came up with the following subproblems:
- echo script,
- base64-encoded chunks echo script, and
- interactive interpreter script.
Each subproblem is dealt with separately. But the last section shows a final evaluator implementation.
Subproblem: Echo Script
An echo script reads input from stdin. Then it prints back the output. The exact same output is printed to stdout.
EchoScript
Here is the code for the echoscript.py file.
import sys
while True:
for line in sys.stdin:
sys.stdout.write(line)
sys.stdout.flush()
(Note: SIGTERM will terminate a Python process running a forever loop. That is, unless the signal handler is overriden or interrupts are disabled.)
EchoEvaluator
The following code is written in an echoevaluator.py file.
import subprocess
import sys
class EchoEvaluator:
def __init__(self, python_executable=None, script_path="echoscript.py"):
if python_executable is None:
python_executable = sys.executable
self.p = subprocess.Popen(
[python_executable, script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8"
)
def _print(self, chunk: str):
self.p.stdin.write(chunk + "\n")
self.p.stdin.flush()
def echo(self, code):
self._print(f"{code}")
return self.p.stdout.readline()
def __del__(self):
self.p.terminate()
Example Use
In [1]: load "echoevaluator.py"
In [2]: # %load "echoevaluator.py"
In [3]: e = EchoEvaluator()
In [4]: e.echo("print this")
Out[4]: 'print this\n'
Subproblem: Encoded Chunk Echo Script
An encoded chunk echo script is like an echo script, but the text is divided into base64-encoded chunks.
Base64 Encoded Chunks
Here is how to encode a string into base64 and split it into chunks of three letters.
In [1]: import base64
In [2]: base64.b64encode("Test string".encode("utf-8")).decode("ascii")
Out[2]: 'VGVzdCBzdHJpbmc='
In [3]: encoded = base64.b64encode("print this".encode("utf-8")).decode("ascii")
In [4]: chunks = [
...: encoded[i: i + 3]
...: for i in range(0, len(encoded), 3)
...: ]
In [5]: chunks
Out[5]: ['cHJ', 'pbn', 'Qgd', 'Ghp', 'cw=', '=']
ChunkEvaluator
The ChunkEvaluator class is similar to EchoEvaluator. Except it includes a method to encode the chunks.
import subprocess
import sys
import base64
class ChunkEvaluator:
def __init__(self, python_executable=None, script_path="chunkscript.py"):
if python_executable is None:
python_executable = sys.executable
self.p = subprocess.Popen(
[python_executable, script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8"
)
def _print(self, chunk: str):
self.p.stdin.write(chunk + "\n")
self.p.stdin.flush()
def _chunk_encode(self, code, size=128):
e = base64.b64encode(code.encode("utf-8")).decode("ascii")
chunks = [
e[i : i + size]
for i in range(0, len(e), size)
]
return chunks
def echo(self, code):
chunks = self._chunk_encode(code, size=3)
self._print(f"chunks {len(chunks)}")
for c in chunks:
self._print(f"{c}")
o = []
for c in chunks:
o.append(self.p.stdout.readline())
return o
def __del__(self):
self.p.terminate()
ChunkEvaluator with EchoScript
First, I copy echoscript.py to chunkscript.py.
cp echoscript.py chunkscript.py
ipython
In [1]: load "chunkevaluator.py"
In [2]: # %load "chunkevaluator.py"
In [3]: e = ChunkEvaluator()
In [4]: e.echo("print this")
Out[4]: ['chunks 6\n', 'cHJ\n', 'pbn\n', 'Qgd\n', 'Ghp\n', 'cw=\n']
With the EchoScript, I confirm the first item to be the string “chunk 6”. All the other items are base64 encoded chunks, which is correct. Now it is time to write the proper ChunkScript.
ChunkEvaluator with ChunkScript
The following code is written to the chunkscript.py file (overwriting all content).
import sys
import base64
def read_chunks(num):
for i in range(num):
line = sys.stdin.readline()
sys.stdout.write(line)
sys.stdout.flush()
state = "Idle"
while True:
line = sys.stdin.readline()
keyword, value = line.split()
read_chunks(int(value))
In [1]: load "chunkevaluator.py"
In [2]: # %load "chunkevaluator.py"
In [3]: e = ChunkEvaluator()
In [4]: e.echo("print this")
Out[4]: ['cHJ\n', 'pbn\n', 'Qgd\n', 'Ghp\n', 'cw=\n', '=\n']
Subproblem: Interactive Interpreter
The next subproblem to tackle is the InteractiveInterpreter, a class defined by the Python code module.
What is the Interactive Interpreter?
The Python code module defines a class named InteractiveInterpreter. It is used to implement read-eval-print loops in Python. You can use it to build an interactive REPL, exactly what is needed for the evaluator.
Here is what help(code.InteractiveInterpreter) says.
class InteractiveInterpreter(builtins.object)
| InteractiveInterpreter(locals=None)
|
| Base class for InteractiveConsole.
|
| This class deals with parsing and interpreter state (the user's
| namespace); it doesn't deal with input buffering or prompting or
| input file naming (the filename is always passed in explicitly).
Method runsource takes source code as input and evaluates / executes it.
| runsource(self, source, filename='<input>', symbol='single')
| Compile and run some source in the interpreter.
|
| Arguments are as for compile_command().
|
| One of several things can happen:
|
| 1) The input is incorrect; compile_command() raised an
| exception (SyntaxError or OverflowError). A syntax traceback
| will be printed by calling the showsyntaxerror() method.
|
| 2) The input is incomplete, and more input is required;
| compile_command() returned None. Nothing happens.
|
| 3) The input is complete; compile_command() returned a code
| object. The code is executed by calling self.runcode() (which
| also handles run-time exceptions, except for SystemExit).
|
| The return value is True in case 2, False in the other cases (unless
| an exception is raised). The return value can be used to
| decide whether to use sys.ps1 or sys.ps2 to prompt the next
| line.
Runsource Output Examples
In [1]: import code
In [2]: ii = code.InteractiveInterpreter()
Case 1: incorrect input
In [3]: ii.runsource("int(\"hello\")")
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
File /usr/lib/python3.12/code.py:90, in InteractiveInterpreter.runcode(self, code)
78 """Execute a code object.
79
80 When an exception occurs, self.showtraceback() is called to
(...) 87
88 """
89 try:
---> 90 exec(code, self.locals)
91 except SystemExit:
92 raise
File <input>:1
ValueError: invalid literal for int() with base 10: 'hello'
Out[3]: False
Case 2: correct but incomplete input.
In [4]: ii.runsource("print(")
Out[4]: True
Note the return value is True. Nothing happened. Sending more code does not complete the input.
In [5]: ii.runsource("\"hello\")")
File <input>:1
"hello")
^
SyntaxError: unmatched ')'
Out[5]: False
Case 3: correct and complete input.
In [6]: ii.runsource("print(\"hello\")")
hello
Out[6]: False
In [7]: ii.runsource("print")
Out[7]: <function print(*args, sep=' ', end='\n', file=None, flush=False)>
Out[7]: False
Defining a variable:
In [10]: ii.runsource("x = 12")
Out[10]: False
In [11]: ii.runsource("print(f\"X: {x}\")")
X: 12
Out[11]: False
The Runsource Symbol Argument
Method runsource accepts one more argument which I did not mention so far. That argument is called symbol and it takes one of three values:
- ‘single’,
- ‘exec’, or
- ‘eval’.
Perhaps it is best to see some examples to show how to use the argument.
Function Call Examples
In [1]: import code
In [2]: ii = code.InteractiveInterpreter()
In [3]: multi = """
...: def hello():
...: print("Hello World")
...: return 10
...: hello()
...: """
In [4]: single = "hello()"
In [5]: ii.runsource(multi, symbol='exec')
Hello World
Out[5]: False
In [6]: ii.runsource(single, symbol='exec')
Hello World
Out[6]: False
In [7]: ii.runsource(single, symbol='eval')
Hello World
Out[7]: False
In [8]: ii.runsource(single, symbol='single')
Hello World
Out[8]: 10
Out[8]: False
Only ‘single’ returned the result. Note that calling runsource with multi as the source argument is only error-free with the symbol argument set to exec.
Symbol value ‘exec’ means the source code input is treated like a Python script. It can contain definitions and multiple-block lines. But it cannot return a result.
Symbol value ‘eval’ means the source code input is treated as exactly one Python expression. It cannot contain multiple expressions.
Unexpected Behavior: Backslashes
Writing in the ipython REPL also creates unexpected errors.
In [1]: import code
In [2]: ii = code.InteractiveInterpreter()
In [3]: source = """
...: print("Hello \n World!")
...: """
In [4]: ii.runsource(source)
File <input>:2
print("Hello
^
SyntaxError: unterminated string literal (detected at line 2)
Out[4]: False
Backslashes are the most common issue because Python treats them as escape characters. The correct version is here.
In [5]: source = r"""print("Hello \n World!")"""
In [6]: ii.runsource(source)
Hello
World!
Out[6]: False
In [7]: source = r"""
...: print("Hello \n World!")
...: """
In [8]: ii.runsource(source)
Hello
World!
Out[8]: False
Implementing the Evaluator and ReplScript
I take the ideas shown in the subproblems, and merge them into the evaluator code and the replscript code.
The Evaluator
import base64
import subprocess
import sys
class Evaluator():
def __init__(self, python_executable=None, script_path="replscript.py"):
if python_executable is None:
python_executable = sys.executable
self.p = subprocess.Popen(
[python_executable, script_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8"
)
def _print(self, chunk: str):
self.p.stdin.write(chunk + "\n")
self.p.stdin.flush()
def _input(self):
return self.p.stdout.readline().strip()
def _chunk_encode(self, code, size=128):
e = base64.b64encode(code.encode("utf-8")).decode("ascii")
chunks = [
e[i : i + size]
for i in range(0, len(e), size)
]
return chunks
def _chunk_decode(self, chunks):
b64_data = "".join(chunks)
decoded = base64.b64decode(b64_data.encode("utf-8"))
return decoded.decode("utf-8")
def runsource_exec(self, code):
self._print(f"symbol exec")
return self._runsource(code)
def runsource_single(self, code):
self._print(f"symbol single")
return self._runsource(code)
def _runsource(self, code):
chunks = self._chunk_encode(code)
self._print(f"chunks {len(chunks)}")
for c in chunks:
self._print(f"{c}")
o = []
keyword, value = self._input().split()
for i in range(int(value)):
o.append(self.p.stdout.readline().strip())
return self._chunk_decode(o)
def __del__(self):
self.p.terminate()
The ReplScript
import sys
import base64
import io
import code
import inspect
import re
from contextlib import redirect_stdout, redirect_stderr
ANSI_RE = re.compile(r'\x1b\[[0-?]*[ -/]*[@-~]')
def run_sources_captured(ii, source, symbol):
out = io.StringIO()
err = io.StringIO()
res = io.StringIO()
# Custom displayhook to capture expression results
def custom_displayhook(value):
if value is not None:
if callable(value):
try:
sig = inspect.signature(value)
print(f"<function {value.__name__}{sig}>", file=res)
except (ValueError, TypeError):
print(repr(value), file=res)
else:
print(repr(value), file=res)
old_displayhook = sys.displayhook
sys.displayhook = custom_displayhook
try:
with redirect_stdout(out), redirect_stderr(err):
more = ii.runsource(source, symbol=symbol)
if more:
res.write("[incomplete input]\n")
finally:
sys.displayhook = old_displayhook
output = out.getvalue() + err.getvalue() + res.getvalue()
return ANSI_RE.sub('', output)
def chunk_encode(code, size=128):
e = base64.b64encode(code.encode("utf-8")).decode("ascii")
chunks = [
e[i : i + size]
for i in range(0, len(e), size)
]
return chunks
def chunk_decode(chunks):
b64_data = "".join(chunks)
decoded = base64.b64decode(b64_data.encode("utf-8"))
return decoded.decode("utf-8")
def read_chunks(num):
chunks = []
for i in range(num):
line = sys.stdin.readline()
chunks.append(line)
return chunks
def write_chunks(chunks):
sys.stdout.write(f"chunks {len(chunks)}" + "\n")
for c in chunks:
sys.stdout.write(c + "\n")
sys.stdout.flush()
while True:
line = sys.stdin.readline()
keyword, value = line.split()
symbol = value
line = sys.stdin.readline()
keyword, value = line.split()
num_chunks = value
chunks = read_chunks(int(value))
decoded = chunks_decode(chunks)
output = []
if symbol == "single"
output = run_sources_captured(ii, decoded, symbol)
else:
output = run_sources_captured(ii, decoded, 'exec')
chunks = chunk_encode(output)
Interactive Use
In [1]: import evaluator
In [2]: e = evaluator.Evaluator()
In [3]: e.runsource_single("print(\"hello world\")")
Out[3]: 'hello world\n'