from __future__ import annotations
import logging
from itertools import islice
from typing import Any, Dict, List, Optional, Tuple, Union
from .exceptions import (
BlocknameDuplicateError,
ProcessError,
StopError,
TagScriptError,
WorkloadExceededError,
)
from .interface import Adapter, Block
from .utils import maybe_await
from .verb import Verb
__all__ = (
"Interpreter",
"AsyncInterpreter",
"Context",
"Response",
"Node",
"build_node_tree",
)
log = logging.getLogger(__name__)
AdapterDict = Dict[str, Adapter]
[docs]class Node:
"""
A low-level object representing a bracketed block.
Attributes
----------
coordinates: Tuple[int, int]
The start and end position of the bracketed text block.
verb: Optional[Verb]
The determined Verb for this node.
output:
The `Block` processed output for this node.
"""
__slots__ = ("output", "verb", "coordinates")
def __init__(self, coordinates: Tuple[int, int], verb: Optional[Verb] = None) -> None:
"""
Constructing the Node
"""
self.coordinates = coordinates
self.verb = verb
self.output: Optional[str] = None
def __str__(self) -> str:
"""
String function
"""
return str(self.verb) + " at " + str(self.coordinates)
def __repr__(self) -> str:
"""
String repr
"""
return f"<Node verb={self.verb!r} coordinates={self.coordinates!r} output={self.output!r}>"
[docs]def build_node_tree(message: str) -> List[Node]:
"""
Function that finds all possible nodes in a string.
Parameters
----------
message: str
The string to find nodes in.
Returns
-------
List[Node]
A list of all possible text bracket blocks.
"""
nodes = []
# previous = r""
starts = []
for i, ch in enumerate(message):
if ch == "{": # and previous[1:] != "\\":
starts.append(i)
if ch == "}": # and previous[1:] != "\\":
if not starts:
continue
coords = (starts.pop(), i)
n = Node(coords)
nodes.append(n)
# previous = previous[:1] + ch
return nodes
[docs]class Response:
"""
An object containing information on a completed TagScript process.
Attributes
----------
body: str
The cleaned message with all verbs interpreted.
actions: Dict[str, Any]
A dictionary that blocks can access and modify to define post-processing actions.
variables: Dict[str, Adapter]
A dictionary of variables that blocks such as the `LooseVariableGetterBlock` can access.
extras: Dict[str, Any]
A dictionary of extra keyword arguments that blocks can use to define their own behavior.
"""
__slots__ = ("body", "actions", "variables", "extras")
def __init__(self, *, variables: AdapterDict = None, extras: Dict[str, Any] = None) -> None:
self.body: str = None
self.actions: Dict[str, Any] = {}
self.variables: AdapterDict = variables if variables is not None else {}
self.extras: Dict[str, Any] = extras if extras is not None else {}
def __repr__(self) -> str:
"""
String repr
"""
return f"<Response body={self.body!r} actions={self.actions!r} variables={self.variables!r} extras={self.extras!r}>"
[docs]class Context:
"""
An object containing data on the TagScript block processed by the interpreter.
This class is passed to adapters and blocks during processing.
Attributes
----------
verb: Verb
The Verb object representing a TagScript block.
original_message: str
The original message passed to the interpreter.
interpreter: Interpreter
The interpreter processing the TagScript.
"""
__slots__ = ("verb", "original_message", "interpreter", "response")
def __init__(self, verb: Verb, res: Response, interpreter: Interpreter, og: str) -> None:
"""
Construct the context
"""
self.verb: Verb = verb
self.original_message: str = og
self.interpreter: Interpreter = interpreter
self.response: Response = res
def __repr__(self) -> str:
"""
String repr
"""
return f"<Context verb={self.verb!r}>"
[docs]class Interpreter:
"""
The TagScript interpreter.
Attributes
----------
blocks: UnionList[Block]
A list or tuple of blocks to be used for TagScript processing.
"""
__slots__ = ("blocks", "_blocknames")
def __init__(self, blocks: Union[List[Block], Tuple[Block]]) -> None:
"""
Creates a list of blocks, and also gets all acceptable names for processing
Raises
------
BlocknameDuplicateError
If there are duplicate blocknames.
"""
self.blocks: Union[List[Block], Tuple[Block]] = blocks
self._blocknames = []
for block in blocks:
for name in block.ACCEPTED_NAMES:
if block in self._blocknames:
raise BlocknameDuplicateError(block)
self._blocknames.append(name)
def __repr__(self) -> str:
"""
String repr
"""
return f"<{type(self).__name__} blocks={self.blocks!r}>"
def _get_context(
self,
node: Node,
final: str,
*,
response: Response,
original_message: str,
verb_limit: int,
) -> Context:
"""
Construct a context object for a node.
Parameters
----------
node: Node
The node to construct the context for.
final: str
The final message to be processed.
response: Response
The response object to be passed to the context.
original_message: str
The original message passed to the interpreter.
verb_limit: int
The maximum number of verbs to process.
Returns
-------
Context
The constructed context.
"""
# Get the updated verb string from coordinates and make the context
start, end = node.coordinates
node.verb = Verb(final[start : end + 1], limit=verb_limit)
return Context(node.verb, response, self, original_message)
def _get_acceptors(self, ctx: Context) -> Tuple[Block]:
"""
Get a list of acceptors
Parameters
----------
ctx: Context
The context to get the acceptors for.
Returns
-------
Tuple[Block]
"""
acceptors = (b for b in self.blocks if b.will_accept(ctx))
log.debug("%r acceptors: %r", ctx, acceptors)
return acceptors
def _process_blocks(self, ctx: Context, node: Node) -> Optional[str]:
"""
Process the blocks
Parameters
----------
ctx: Context
The context to process the blocks from.
node: Node
The node to process the blocks from.
Returns
-------
Optional[str]
The final message
"""
acceptors = self._get_acceptors(ctx)
for b in acceptors:
value = b.process(ctx)
if value is not None: # Value found? We're done here.
value = str(value)
node.output = value
return value
@staticmethod
def _check_workload(charlimit: int, total_work: int, output: str) -> Optional[int]:
"""
Check if the workload has been exceeded.
Parameters
----------
charlimit: int
The maximum number of characters to process.
total_work: int
The total number of characters processed.
output: str
The output string.
Returns
-------
Optional[int]
The total amount of work that has been processed.
Raises
------
WorkloadExceededError
If the workload has been exceeded.
"""
if not charlimit:
return
total_work += len(output)
if total_work > charlimit:
raise WorkloadExceededError(
"The TSE interpreter had its workload exceeded. The total characters "
f"attempted were {total_work}/{charlimit}"
)
return total_work
@staticmethod
def _text_deform(start: int, end: int, final: str, output: str) -> Tuple[str, int]:
"""
Deform the text, replacing code with what was outputted.
Parameters
----------
start: int
The start index of the code.
end: int
The end index of the code.
final: str
The final message.
output: str
The output string.
Returns
-------
Tuple[str, int]
The new final message, and the change in final length after the change has been applied.
"""
message_slice_len = (end + 1) - start
replacement_len = len(output)
differential = (
replacement_len - message_slice_len
) # The change in size of `final` after the change is applied
final = final[:start] + output + final[end + 1 :]
return final, differential
@staticmethod
def _translate_nodes(
node_ordered_list: List[Node], index: int, start: int, differential: int
) -> None:
"""
Get the new coordinates for each node.
Parameters
----------
node_ordered_list: List[Node]
The list of nodes to translate.
index: int
The index of the node to translate.
start: int
The start index of the code.
differential: int
The change in final length after the change has been applied.
Returns
-------
None
"""
for future_n in islice(node_ordered_list, index + 1, None):
new_start = None
new_end = None
if future_n.coordinates[0] > start:
new_start = future_n.coordinates[0] + differential
else:
new_start = future_n.coordinates[0]
if future_n.coordinates[1] > start:
new_end = future_n.coordinates[1] + differential
else:
new_end = future_n.coordinates[1]
future_n.coordinates = (new_start, new_end)
def _solve(
self,
message: str,
node_ordered_list: List[Node],
response: Response,
*,
charlimit: int,
verb_limit: int = 2000,
) -> Optional[str]:
"""
Solve the tagscript by proccessing all possible nodes.
Parameters
----------
message: str
The message to process.
node_ordered_list: List[Node]
The list of nodes to process.
response: Response
The response object to be passed to the context.
charlimit: int
The maximum number of characters to process.
verb_limit: int
The maximum number of verbs to process.
Returns
-------
Optional[str]
The final, completely processed message.
"""
final = message
total_work = 0
for index, node in enumerate(node_ordered_list):
start, end = node.coordinates
ctx = self._get_context(
node,
final,
response=response,
original_message=message,
verb_limit=verb_limit,
)
log.debug("Processing context %r at (%r, %r)", ctx, start, end)
try:
output = self._process_blocks(ctx, node)
except StopError as exc:
log.debug("StopError raised on node %r", node, exc_info=exc)
return final[:start] + exc.message
if output is None:
continue # If there was no value output, no need to text deform.
total_work = self._check_workload(charlimit, total_work, output)
final, differential = self._text_deform(start, end, final, output)
self._translate_nodes(node_ordered_list, index, start, differential)
return final
@staticmethod
def _return_response(response: Response, output: str) -> Response:
"""
Return the response object.
Parameters
----------
response: Response
The response object to be returned.
output: str
The output string.
Returns
-------
Response
The response object.
"""
if response.body is None:
response.body = output.strip()
else:
# Dont override an overridden response.
response.body = response.body.strip()
return response
[docs] def process(
self,
message: str,
seed_variables: AdapterDict = None,
*,
charlimit: Optional[int] = None,
**kwargs,
) -> Response:
"""
Processes a given TagScript string.
Parameters
----------
message: str
A TagScript string to be processed.
seed_variables: Dict[str, Adapter]
A dictionary containing strings to adapters to provide context variables for processing.
charlimit: int
The maximum characters to process.
kwargs: Dict[str, Any]
Additional keyword arguments that may be used by blocks during processing.
Returns
-------
Response
A response object containing the processed body, actions and variables.
Raises
------
TagScriptError
A block intentionally raised an exception, most likely due to invalid user input.
WorkloadExceededError
Signifies the interpreter reached the character limit, if one was provided.
ProcessError
An unexpected error occurred while processing blocks.
"""
response = Response(variables=seed_variables, extras=kwargs)
node_ordered_list = build_node_tree(message)
try:
output = self._solve(
message,
node_ordered_list,
response,
charlimit=charlimit,
)
except TagScriptError:
raise
except Exception as error:
raise ProcessError(error, response, self) from error
return self._return_response(response, output)
[docs]class AsyncInterpreter(Interpreter):
"""
An asynchronous subclass of `Interpreter` that allows blocks to implement asynchronous methods.
Synchronous blocks are still supported.
This subclass has no additional attributes from the `Interpreter` class.
See `Interpreter` for full documentation.
"""
async def _get_acceptors(self, ctx: Context) -> Tuple[Block]:
"""
Get a list of acceptors
Parameters
----------
ctx: Context
The context to get the acceptors for.
Returns
-------
Tuple[Block]
"""
return (b for b in self.blocks if await maybe_await(b.will_accept, ctx))
async def _process_blocks(self, ctx: Context, node: Node) -> Optional[str]:
"""
Process the blocks
Parameters
----------
ctx: Context
The context to process the blocks from.
node: Node
The node to process the blocks from.
Returns
-------
Optional[str]
The final message
"""
acceptors = await self._get_acceptors(ctx)
for b in acceptors:
value = await maybe_await(b.process, ctx)
if value is not None: # Value found? We're done here.
value = str(value)
node.output = value
return value
async def _solve(
self,
message: str,
node_ordered_list: List[Node],
response: Response,
*,
charlimit: int,
verb_limit: int = 2000,
) -> Optional[str]:
"""
Solve the tagscript by proccessing all possible nodes.
Parameters
----------
message: str
The message to process.
node_ordered_list: List[Node]
The list of nodes to process.
response: Response
The response object to be passed to the context.
charlimit: int
The maximum number of characters to process.
verb_limit: int
The maximum number of verbs to process.
Returns
-------
Optional[str]
The final, completely processed message.
"""
final = message
total_work = 0
for index, node in enumerate(node_ordered_list):
start, end = node.coordinates
ctx = self._get_context(
node,
final,
response=response,
original_message=message,
verb_limit=verb_limit,
)
try:
output = await self._process_blocks(ctx, node)
except StopError as exc:
return final[:start] + exc.message
if output is None:
continue # If there was no value output, no need to text deform.
total_work = self._check_workload(charlimit, total_work, output)
final, differential = self._text_deform(start, end, final, output)
self._translate_nodes(node_ordered_list, index, start, differential)
return final
[docs] async def process(
self,
message: str,
seed_variables: AdapterDict = None,
*,
charlimit: Optional[int] = None,
**kwargs,
) -> Response:
"""
Asynchronously process a given TagScript string.
This method has no additional attributes from the `Interpreter` class.
See `Interpreter.process` for full documentation.
"""
response = Response(variables=seed_variables, extras=kwargs)
node_ordered_list = build_node_tree(message)
try:
output = await self._solve(
message,
node_ordered_list,
response,
charlimit=charlimit,
)
except TagScriptError:
raise
except Exception as error:
raise ProcessError(error, response, self) from error
return self._return_response(response, output)