Making a REPL with an Evaluator

In the previous chapter, I have shown a Toolkit component. The Toolkit contains definitions for function tools for the LLM API. But it does not explicitly perform a tool call (that will be done in later chapters).

In this chapter, I show the evaluator component. The evaluator is a program to which you can send program code. A code interpreter. But the focus here is an evaluator that the LLM can interact with. The LLM sends a function tool call to interact with the evaluator.

Here are my two goals for this chapter. The evaluator must:

  • manage a separate process for a Python interpreter,
  • provide a method to send code to the interpreter and return as a string the output of the stdout and stderr.

The evaluator is a complex topic. Perhaps it is best to subdivide the problem. I thought about it for some time, and came up with the following subproblems:

  • echo script,
  • base64-encoded chunks echo script, and
  • interactive interpreter script.

Each subproblem is dealt with separately. But the last section shows a final evaluator implementation.

Subproblem: Echo Script

An echo script reads input from stdin. Then it prints back the output. The exact same output is printed to stdout.

EchoScript

Here is the code for the echoscript.py file.

import sys

while True:
    for line in sys.stdin:
        sys.stdout.write(line)
        sys.stdout.flush()

(Note: SIGTERM will terminate a Python process running a forever loop. That is, unless the signal handler is overriden or interrupts are disabled.)

EchoEvaluator

The following code is written in an echoevaluator.py file.

import subprocess
import sys

class EchoEvaluator:
    def __init__(self, python_executable=None, script_path="echoscript.py"):
        if python_executable is None:
            python_executable = sys.executable
        self.p = subprocess.Popen(
            [python_executable, script_path],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            encoding="utf-8"
        )

    def _print(self, chunk: str):
        self.p.stdin.write(chunk + "\n")
        self.p.stdin.flush()

    def echo(self, code):
        self._print(f"{code}")
        return self.p.stdout.readline()

    def __del__(self):
        self.p.terminate()

Example Use

In [1]: load "echoevaluator.py"

In [2]: # %load "echoevaluator.py"
In [3]: e = EchoEvaluator()
In [4]: e.echo("print this")
Out[4]: 'print this\n'

Subproblem: Encoded Chunk Echo Script

An encoded chunk echo script is like an echo script, but the text is divided into base64-encoded chunks.

Base64 Encoded Chunks

Here is how to encode a string into base64 and split it into chunks of three letters.

In [1]: import base64
In [2]: base64.b64encode("Test string".encode("utf-8")).decode("ascii")
Out[2]: 'VGVzdCBzdHJpbmc='
In [3]: encoded = base64.b64encode("print this".encode("utf-8")).decode("ascii")
In [4]: chunks = [
   ...:     encoded[i: i + 3]
   ...:     for i in range(0, len(encoded), 3)
   ...: ]
In [5]: chunks
Out[5]: ['cHJ', 'pbn', 'Qgd', 'Ghp', 'cw=', '=']

ChunkEvaluator

The ChunkEvaluator class is similar to EchoEvaluator. Except it includes a method to encode the chunks.

import subprocess
import sys
import base64

class ChunkEvaluator:
    def __init__(self, python_executable=None, script_path="chunkscript.py"):
        if python_executable is None:
            python_executable = sys.executable
        self.p = subprocess.Popen(
            [python_executable, script_path],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            encoding="utf-8"
        )

    def _print(self, chunk: str):
        self.p.stdin.write(chunk + "\n")
        self.p.stdin.flush()

    def _chunk_encode(self, code, size=128):
        e = base64.b64encode(code.encode("utf-8")).decode("ascii")
        chunks = [
            e[i : i + size]
            for i in range(0, len(e), size)
        ]
        return chunks

    def echo(self, code):
        chunks = self._chunk_encode(code, size=3)
        self._print(f"chunks {len(chunks)}")
        for c in chunks:
            self._print(f"{c}")
        o = []
        for c in chunks:
            o.append(self.p.stdout.readline())
        return o

    def __del__(self):
        self.p.terminate()

ChunkEvaluator with EchoScript

First, I copy echoscript.py to chunkscript.py.

cp echoscript.py chunkscript.py
ipython
In [1]: load "chunkevaluator.py"

In [2]: # %load "chunkevaluator.py"
In [3]: e = ChunkEvaluator()
In [4]: e.echo("print this")
Out[4]: ['chunks 6\n', 'cHJ\n', 'pbn\n', 'Qgd\n', 'Ghp\n', 'cw=\n']

With the EchoScript, I confirm the first item to be the string “chunk 6”. All the other items are base64 encoded chunks, which is correct. Now it is time to write the proper ChunkScript.

ChunkEvaluator with ChunkScript

The following code is written to the chunkscript.py file (overwriting all content).

import sys
import base64

def read_chunks(num):
    for i in range(num):
        line = sys.stdin.readline()
        sys.stdout.write(line)
        sys.stdout.flush()
    state = "Idle"

while True:
    line = sys.stdin.readline()
    keyword, value = line.split()
    read_chunks(int(value))
In [1]: load "chunkevaluator.py"

In [2]: # %load "chunkevaluator.py"
In [3]: e = ChunkEvaluator()
In [4]: e.echo("print this")
Out[4]: ['cHJ\n', 'pbn\n', 'Qgd\n', 'Ghp\n', 'cw=\n', '=\n']

Subproblem: Interactive Interpreter

The next subproblem to tackle is the InteractiveInterpreter, a class defined by the Python code module.

What is the Interactive Interpreter?

The Python code module defines a class named InteractiveInterpreter. It is used to implement read-eval-print loops in Python. You can use it to build an interactive REPL, exactly what is needed for the evaluator.

Here is what help(code.InteractiveInterpreter) says.

class InteractiveInterpreter(builtins.object)
 |  InteractiveInterpreter(locals=None)
 |
 |  Base class for InteractiveConsole.
 |
 |  This class deals with parsing and interpreter state (the user's
 |  namespace); it doesn't deal with input buffering or prompting or
 |  input file naming (the filename is always passed in explicitly).

Method runsource takes source code as input and evaluates / executes it.

 |  runsource(self, source, filename='<input>', symbol='single')
 |      Compile and run some source in the interpreter.
 |
 |      Arguments are as for compile_command().
 |
 |      One of several things can happen:
 |
 |      1) The input is incorrect; compile_command() raised an
 |      exception (SyntaxError or OverflowError).  A syntax traceback
 |      will be printed by calling the showsyntaxerror() method.
 |
 |      2) The input is incomplete, and more input is required;
 |      compile_command() returned None.  Nothing happens.
 |
 |      3) The input is complete; compile_command() returned a code
 |      object.  The code is executed by calling self.runcode() (which
 |      also handles run-time exceptions, except for SystemExit).
 |
 |      The return value is True in case 2, False in the other cases (unless
 |      an exception is raised).  The return value can be used to
 |      decide whether to use sys.ps1 or sys.ps2 to prompt the next
 |      line.

Runsource Output Examples

In [1]: import code
In [2]: ii = code.InteractiveInterpreter()

Case 1: incorrect input

In [3]: ii.runsource("int(\"hello\")")
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File /usr/lib/python3.12/code.py:90, in InteractiveInterpreter.runcode(self, code)
     78 """Execute a code object.
     79 
     80 When an exception occurs, self.showtraceback() is called to
   (...)     87 
     88 """
     89 try:
---> 90     exec(code, self.locals)
     91 except SystemExit:
     92     raise

File <input>:1

ValueError: invalid literal for int() with base 10: 'hello'
Out[3]: False

Case 2: correct but incomplete input.

In [4]: ii.runsource("print(")
Out[4]: True

Note the return value is True. Nothing happened. Sending more code does not complete the input.

In [5]: ii.runsource("\"hello\")")
  File <input>:1
    "hello")
           ^
SyntaxError: unmatched ')'

Out[5]: False

Case 3: correct and complete input.

In [6]: ii.runsource("print(\"hello\")")
hello
Out[6]: False
In [7]: ii.runsource("print")
Out[7]: <function print(*args, sep=' ', end='\n', file=None, flush=False)>
Out[7]: False

Defining a variable:

In [10]: ii.runsource("x = 12")
Out[10]: False
In [11]: ii.runsource("print(f\"X: {x}\")")
X: 12
Out[11]: False

The Runsource Symbol Argument

Method runsource accepts one more argument which I did not mention so far. That argument is called symbol and it takes one of three values:

  • ‘single’,
  • ‘exec’, or
  • ‘eval’.

Perhaps it is best to see some examples to show how to use the argument.

Function Call Examples

In [1]: import code

In [2]: ii = code.InteractiveInterpreter()

In [3]: multi = """
   ...: def hello():
   ...:     print("Hello World")
   ...:     return 10
   ...: hello()
   ...: """

In [4]: single = "hello()"

In [5]: ii.runsource(multi, symbol='exec')
Hello World
Out[5]: False

In [6]: ii.runsource(single, symbol='exec')
Hello World
Out[6]: False

In [7]: ii.runsource(single, symbol='eval')
Hello World
Out[7]: False

In [8]: ii.runsource(single, symbol='single')
Hello World
Out[8]: 10
Out[8]: False

Only ‘single’ returned the result. Note that calling runsource with multi as the source argument is only error-free with the symbol argument set to exec.

Symbol value ‘exec’ means the source code input is treated like a Python script. It can contain definitions and multiple-block lines. But it cannot return a result.

Symbol value ‘eval’ means the source code input is treated as exactly one Python expression. It cannot contain multiple expressions.

Unexpected Behavior: Backslashes

Writing in the ipython REPL also creates unexpected errors.

In [1]: import code

In [2]: ii = code.InteractiveInterpreter()

In [3]: source = """
   ...: print("Hello \n World!")
   ...: """

In [4]: ii.runsource(source)
  File <input>:2
    print("Hello
          ^
SyntaxError: unterminated string literal (detected at line 2)

Out[4]: False

Backslashes are the most common issue because Python treats them as escape characters. The correct version is here.

In [5]: source = r"""print("Hello \n World!")"""

In [6]: ii.runsource(source)
Hello 
 World!
Out[6]: False

In [7]: source = r"""
   ...: print("Hello \n World!")
   ...: """

In [8]: ii.runsource(source)
Hello 
 World!
Out[8]: False

Implementing the Evaluator and ReplScript

I take the ideas shown in the subproblems, and merge them into the evaluator code and the replscript code.

The Evaluator

import base64
import subprocess
import sys

class Evaluator():
    def __init__(self, python_executable=None, script_path="replscript.py"):
        if python_executable is None:
            python_executable = sys.executable
        self.p = subprocess.Popen(
            [python_executable, script_path],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            encoding="utf-8"
        )

    def _print(self, chunk: str):
        self.p.stdin.write(chunk + "\n")
        self.p.stdin.flush()

    def _input(self):
        return self.p.stdout.readline().strip()

    def _chunk_encode(self, code, size=128):
        e = base64.b64encode(code.encode("utf-8")).decode("ascii")
        chunks = [
            e[i : i + size]
            for i in range(0, len(e), size)
        ]
        return chunks

    def _chunk_decode(self, chunks):
        b64_data = "".join(chunks)
        decoded = base64.b64decode(b64_data.encode("utf-8"))
        return decoded.decode("utf-8")

    def runsource_exec(self, code):
        self._print(f"symbol exec")
        return self._runsource(code)

     def runsource_single(self, code):
        self._print(f"symbol single")
        return self._runsource(code)

    def _runsource(self, code):
        chunks = self._chunk_encode(code)
        self._print(f"chunks {len(chunks)}")
        for c in chunks:
            self._print(f"{c}")
        o = []
        keyword, value = self._input().split()
        for i in range(int(value)):
            o.append(self.p.stdout.readline().strip())
        return self._chunk_decode(o)

    def __del__(self):
        self.p.terminate()

The ReplScript

import sys
import base64
import io
import code
import inspect
import re

from contextlib import redirect_stdout, redirect_stderr

ANSI_RE = re.compile(r'\x1b\[[0-?]*[ -/]*[@-~]')

def run_sources_captured(ii, source, symbol):
    out = io.StringIO()
    err = io.StringIO()
    res = io.StringIO()

    # Custom displayhook to capture expression results
    def custom_displayhook(value):
        if value is not None:
            if callable(value):
                try:
                    sig = inspect.signature(value)
                    print(f"<function {value.__name__}{sig}>", file=res)
                except (ValueError, TypeError):
                    print(repr(value), file=res)
            else:
                print(repr(value), file=res)

    old_displayhook = sys.displayhook
    sys.displayhook = custom_displayhook

    try:
        with redirect_stdout(out), redirect_stderr(err):
            more = ii.runsource(source, symbol=symbol)
            if more:
                res.write("[incomplete input]\n")
    finally:
        sys.displayhook = old_displayhook

    output = out.getvalue() + err.getvalue() + res.getvalue()
    return ANSI_RE.sub('', output)

def chunk_encode(code, size=128):
    e = base64.b64encode(code.encode("utf-8")).decode("ascii")
    chunks = [
        e[i : i + size]
        for i in range(0, len(e), size)
    ]
    return chunks

def chunk_decode(chunks):
    b64_data = "".join(chunks)
    decoded = base64.b64decode(b64_data.encode("utf-8"))
    return decoded.decode("utf-8")

def read_chunks(num):
    chunks = []
    for i in range(num):
        line = sys.stdin.readline()
        chunks.append(line)
    return chunks

def write_chunks(chunks):
    sys.stdout.write(f"chunks {len(chunks)}" + "\n")
    for c in chunks:
        sys.stdout.write(c + "\n")
    sys.stdout.flush()

while True:
    line = sys.stdin.readline()
    keyword, value = line.split()
    symbol = value

    line = sys.stdin.readline()
    keyword, value = line.split()
    num_chunks = value

    chunks = read_chunks(int(value))
    decoded = chunks_decode(chunks)

    output = []
    if symbol == "single"
        output = run_sources_captured(ii, decoded, symbol)
    else:
        output = run_sources_captured(ii, decoded, 'exec')

    chunks = chunk_encode(output)

Interactive Use

In [1]: import evaluator

In [2]: e = evaluator.Evaluator()

In [3]: e.runsource_single("print(\"hello world\")")
Out[3]: 'hello world\n'