A.M.I.C.A.
What does it take to build a system where multiple agents can interact asynchronously, respond to user queries and take proactive action when things change around them?
Many articles already explain how to build an agent with existing libraries and SDKs. Here, I’ll cover what else you need to create a multi-agent system: a future-proof architecture that supports asynchronous communication, works with any tooling or models, and runs locally on varied hardware.
Prerequisites This is a dense article that introduces many concepts without fully defining them. Some familiarity with LLMs, the concept of agentic AI, and how to build agents is advised, in particular with LangChain4J and Jlama.
I’m especially interested in building a Jarvis-like home automation system with Raspberry Pi nodes communicating through a Mosquitto MQTT server. But as I say, I will keep everything implementation-agnostic, so every piece can be replaced by another one if required.
This article describes A.M.I.C.A.
(“Autonomous Multi-agent Intelligent Companion Assistant”), a Proof of Concept (PoC) for an agentic platform that fulfills the following requirements:
- Multi-agent: I want the system to be able to hold multiple agents that interact asynchronously.
- Language-agnostic: I want to be able to integrate agents written using different languages and frameworks.
- Distributed: I want to have agents running on different hardware.
With the requirements clear, let’s start from the beginning.
What is an Agent?
There is no consensus on what an AI agent means actually. We have the academic definition given by Russell and Norvig of an Agent as “Anything that can be viewed as perceiving its environment through sensors and acting upon that environment through actuators”.
There is also a more modern approach, very well represented by Anthropic’s definition, that considers that Agents “are systems where LLMs dynamically direct their own processes and tool usage, maintaining control over how they accomplish tasks”.
We can distill both definitions into key requirements for an agentic system:
- Autonomy: An agent can answer user questions, but also can react to any change in its environment.
- Adaptability: Contrary to deterministic software applications, Agents can adapt their behavior to their context.
- Tool usage: An agent has a set of tools and decides which one to use for the task at hand.
Large Language Models (LLMs) are trained to not only generate text that is consistent, but also to use that capacity to reason and come up with chains of steps to solve problems. This capability makes them valuablef tools to build software Agents.
Do LLMs actually reason?
Even when language is the tool we use to communicate logic relations, and Large Language Models excel at generating language, it is not clear at all that this is enough to consider that an LLM can (or will ever be able to) reason as a human. Some authors think that there needs to be “some extra component” added to an LLM to make it actually reason like a human.
What do we need to run a multi-agent system?
To build a multi-agent platform, we need three things: a message bus, shared memory, and tool support. Let’s see each one in detail.
Message Bus
An agent has to be able to receive notifications when the environment changes, and to send messages to the user or to other agents. There needs to be some kind of Message Bus that the agent will use to subscribe to changes in the environment, and to publish notifications. There will be at least three channels in the bus:
- System Events Channel, so that the Agent detects when the world changes.
- User Request Channel, for the user to communicate with the available agents.
- Agent Responses Channel, for messages that Agents publish in response to user requests.
Shared Memory
Agents collaborate in user conversations, so each one must know what others have already said.
We will need a shared resource where Agents can collaboratively keep track of the conversations. Other potential uses (that I have not implemented in this initial version) of this storage are:
- Caching the result of tools execution
- Keeping data about the system status that will help make decisions
- Act as a knowledge repository for RAG applications
Shared (or not) tools
Agents will need tools to perform their tasks. Some tools will be specific to a particular Agent, others can be of use to more than one. Having a shared framework that supports tool management and execution will simplify this task.
A.M.I.C.A. components
Now that we understand the goals we need to accomplish in this project, let’s describe the different components that integrate the platform. As always, the code for this project is available in a GitHub repository.
A.M.I.C.A. is composed of several projects:
$ tree -L 2
.
├── agents
│ └── local-system-agent
│ └── minute-taker-agent
├── amica-cli
├── amica-sdk-java
├── doc
├── docker-compose.yml
├── mosquitto
- a java sdk (more versions might be created in the future, for example a python sdk)
- two example agents, built with:
- Micronaut as application framework
- LangChain4J as AI tooling
- JLama for local LLM execution
- a CLI application, built with Micronaut
- a dockerized Mosquitto server
- a dockerized Valkey server
- a docker-compose file to run mosquitto and valkey
The following diagram describes how these pieces interact with each other:
The Message Broker (MQTT) and Shared Memory (Valkey) are the center of the system, allowing several agents interact by publishing and subscribing to different types of messages, and storing shared resources, like the conversations they participate in with the user.
The agents are language and platform independent. Agents using Java and LangChain4J can interact with others written with Python and LangGraph, and with a CLI application written with TypeScript, for example.
Let’s discuss each of these components in detail.
Message Bus
The Message Bus is used by the agents to interact with the user, and with other agents. Since this system is language agnostic, all the accepted payload types are defined as JSON Schema files. The Java SDK uses the JSON Schema to Java Record plugin to generate Java code from the payload definitions.
There needs to be a component in the SDK that allows for agents to subscribe to messages sent by the user, as well as to send other messages as response. That same component allows agents to listen for system events.
In the Java SDK, that component implements the MessageBus interface:
public interface MessageBus {
void send(Message<?> message);
default void send(Message<?>... messages) {
Arrays
.stream(messages)
.forEach(this::send);
}
void send(
Message<?> message,
MessageConsumer<AgentResponse> responseConsumer);
void respond(
MessageId originalMessageId,
Message<AgentResponse> response);
<P extends MessagePayload> void registerConsumer(
MessageTopic messageTopic,
Class<P> payloadType,
MessageConsumer<P> consumer);
}
The default implementation of this interface is HiveMqttMessageBus, that uses an MQTT broker (mosquitto in this case) to handle all the use cases.
The way this is implemented is by using two main topics:
- es.nachobrito.amica.user.request.v1: agents that handle user requests will subscribe to this topic to receive related messages.
- es.nachobrito.amica.system.event.v1: agents that need to respond to system events will subscribe to this topic.
Additional topics are created to handle responses to the users requests:
When a user sends a request to the bus, it will route that to all the agents, and will create a new topic where the responses from the agents will be routed so that the client app will show them to the user:
@Override
public void send(Message<?> message, MessageConsumer<AgentResponse> responseConsumer) {
var mqttMessageTopic = toMqttTopic(message.topic());
var mqttResponsesTopic = toMqttResponsesTopic(message.id());
var responsesTopic = MessageFactory.fromTopic(mqttResponsesTopic);
var client = getClient(message);
//subscribe for message responses with a ConversationBuffer
var buffer = new ConversationBuffer(responseConsumer, () -> {
unRegisterConsumer(responsesTopic, AgentResponse.class);
});
registerConsumer(responsesTopic, AgentResponse.class, buffer::accept);
//publish message so agents will be able to respond
publishMessage(client, mqttMessageTopic, message).thenCompose(publishResult -> client.disconnect());
}
The ConversationBuffer class is required in this case because the MQTT protocol does not (always) guarantee that the messages are received in the same order they are published. So we use this class that keeps a buffer and sorts the messages in case they come out of order, thanks to a sequence number that the agents include in every response message.
IMPORTANT - This implementation leaves all the messaging logic in the agent side. This is OK for a PoC, but leaves critical functionalities like authorization or monitoring unimplemented. In a production ready implementation, instead of having an implementation of MessageBus subscribing directly to the broker topics, agents would interact with a server-side component that would take care for those functionalities. But this would happen behind the MessageBus interface, transparent to the agent code.
Another critical role of the message bus is transmitting system events, as agents do not only respond to user requests, but also take proactive action when the context changes. As an example, we publish an event whenever a conversation between an agent and the user is complete:
@Override
public void respond(
MessageId originalMessageId,
Message<AgentResponse> response) {
var topic = toMqttResponsesTopic(originalMessageId);
var client = getClient(response);
var future = publishMessage(client, topic, response);
future.thenAccept(publishResult -> client.disconnect());
if (Boolean.TRUE.equals(response.payload().isComplete())) {
logger.info(
"Conversation {} closed. Publishing ConversationEnded system event.",
response.conversationId().value());
send(
Message
.systemEvent(
new ConversationEnded(
response.conversationId().value())));
}
}
As we will discuss later, one of the agents in this system will listen for these events to implement some logic without user intervention.
Shared Memory
The next shared resource in our multi-agent setup is memory, this is, data that is useful to different agents regardless of which one generated it. This might include things like:
- Conversations between the user and one or more agents, which would have them as a shared context.
- Cached results of shared tool execution.
- Different data items about the environment, like a registry of active agents, users, etc.
For all this kind of resources, this setup uses Valkey (a redis fork started in 2024 after its license change).
Same as with the MessageBus
, there is a Memory interface that allows for different implementations to be used depending on the context (eg. in-memory for testing, different storage providers, etc). Currently the interface defines a single method, used to retrieve a particular conversation:
public interface Memory {
Conversation getConversation(ConversationId conversationId);
}
Conversation
is also an interface:
public interface Conversation {
ConversationId getId();
List<ConversationMessage> getMessages();
void add(ConversationMessage conversationMessage);
}
The Valkey-based implementation stores messages in a list:
public class ValkeyConversation implements Conversation {
private final ConversationId conversationId;
private final JedisPool jedisPool;
private final MessageSerializer messageSerializer;
private static final long CONVERSATION_TIMEOUT = Duration.ofHours(1).toMillis();
...
@Override
public List<ConversationMessage> getMessages() {
try (var jedis = jedisPool.getResource()) {
var conversationKey = getConversationKey();
var items = jedis.lrange(conversationKey, 0, -1);
return items
.stream()
.map(messageSerializer::deSerializeMessage)
.toList();
}
}
@Override
public void add(ConversationMessage conversationMessage) {
try (var jedis = jedisPool.getResource()) {
var conversationKey = getConversationKey();
var serialized = messageSerializer
.serializeMessage(conversationMessage);
jedis.rpush(conversationKey, serialized);
jedis.expire(conversationKey, CONVERSATION_TIMEOUT);
}
}
}
Tool support
Agents use tools to act. Defining tools in a standard, tech-agnostic way makes discovery and execution easier.
A tool
is basically a function, but also needs some logic to be able to describe itself, so that the agents can decide when and how to use them. This is the Tool interface:
public interface Tool<A, R> {
String getDescription();
Class<A> getArgumentClass();
Class<R> getResultClass();
R execute(A args);
default String getName() {
return getClass().getSimpleName();
}
}
Tool Manager service
The ToolManager interface defines the contract for a service that knows how to identify the relevant tools for a particular user request:
public interface ToolManager {
Set<Tool<?, ?>> getRelevantTools(String userQuery);
Optional<Tool<?, ?>> getTool(String name);
}
In this project there is a particular implementation that uses an in-memory Lucene index to find tools that relate to a user query. Other mechanisms can be explored, like semantic searching.
Tool Executor service
Centralized tool execution is provided by an implementation of the ToolExecutor interface, that allows for both synchronous and asynchronous tool invocation:
public interface ToolExecutor {
<A, R> R execute(Tool<A, R> tool, A arguments);
<A, R> CompletableFuture<R> executeAsync(Tool<A, R> tool, A arguments);
}
In this case, the default implementation is pretty simple:
public class DefaultToolExecutor implements ToolExecutor {
@Override
public <A, R> R execute(Tool<A, R> tool, A arguments) {
return tool.execute(arguments);
}
@Override
public <A, R> CompletableFuture<R> executeAsync(Tool<A, R> tool, A arguments) {
return CompletableFuture.supplyAsync(() -> tool.execute(arguments));
}
}
The Agents
Now that we have shared MessageBus
, Memory
, ToolManager
and ToolExecutor
, we can define what an Agent is in this platform:
public interface Agent {
AgentDetails getDetails();
void onUserMessage(Message<UserRequest> userRequestMessage);
void onSystemMessage(Message<? extends SystemEvent> systemMessage);
List<Class<? extends SystemEvent>> getAcceptedSystemMessages();
default void register(MessageBus bus) {
var agentDetails = getDetails();
if (agentDetails.listenUserRequests()) {
bus.registerConsumer(MessageTopic.USER_REQUESTS, UserRequest.class, this::onUserMessage);
}
if (agentDetails.listenSystemEvents()) {
getAcceptedSystemMessages()
.forEach(type ->
bus.registerConsumer(MessageTopic.SYSTEM_EVENTS, type, this::onSystemMessage));
}
}
}
An Agent
describes itself by returning an AgentDetails record:
public record AgentDetails(
String agentId,
String agentName,
boolean listenUserRequests,
boolean listenSystemEvents) {
private static final Pattern idPattern = Pattern.compile("^[a-z0-9\\-]{5,}$");
public AgentDetails {
validateAgentId(agentId);
}
private void validateAgentId(String agentId) {
if (!idPattern.matcher(agentId).matches()) {
throw new IllegalArgumentException("Agent id does not match %s".formatted(idPattern.pattern()));
}
}
}
If an agent needs to listen to system events, it will also implement the getAcceptedSystemMessages
method returning a list of the payload types it is interested in.
An agent is also able to register itself to a MessageBus
, subscribing to the required topics.
Apart from that, the Agent
defines two methods to listen for the main types of messages in the MessageBus
:
onUserMessage
for requests from the user.onSystemMessage
for system events.
The definition of Agent
closes this journey through the artifacts included in the A.M.I.C.A Java SDK. Let’s see three projects that use this SDK to interact with the platform: two different agents and a CLI application.
Local System Agent
The Local System Agent uses a local LLM and tools to answer system questions, like user count or load average.
In particular, it uses Langchain4J’s AI Services functionality and Jlama integration to build an agent that listens for user request messages.
The AIService interface is pretty simple:
public interface AIAssistant {
@UserMessage(fromResource = "prompt.md")
TokenStream chat(
@MemoryId String memoryId, @UserName String userName, @V("user_message") String userMessage);
}
The chat
method accepts the following parameters:
- The memoryId, so that the
MemoryProvider
will store messages in the right conversation. - The user name as it is required in the user request messages payload.
- the user message, that is replaced in the corresponding placeholder in the prompt.
The prompt is read automatically from the prompt.md
file:
# Identity
You are an assistant specialized on providing detailed information about the current system, such as cpu load, memory
usage or running processes. You MUST use the provided tools to get the information requested.
# Instructions
- Current date is {{current_date_time}}
- Use the functions — don’t guess.
Respond to the following user message:
{{user_message}}
Most of the artifacts in the agent fall into one of these categories:
- Micronaut bean factories to register artifacts into the dependency injection, for example:
- Micronaut-specific implementations of the interfaces in the SDK, for example:
- Factories to translate the A.M.I.C.A. concepts to the equivalent LangChain4J ones, for example:
With these resources, the AiAssistantFactory creates an instance of an AIService
for the agent to use.
The LocalSystemAgent class implements the Agent
interface to answer the user questions about the local system:
...
private AIAssistant getAssistant() {
if (assistant == null) {
assistant = AIAssistantFactory.with(
memory,
toolManager,
toolExecutor,
agentDetails);
}
return assistant;
}
...
@Override
public void onUserMessage(Message<UserRequest> userMessage) {
var payload = userMessage.payload();
TokenStream tokenStream =
getAssistant()
.chat(
userMessage.conversationId().value(),
payload.userName(), payload.message()
);
var sequence = new AtomicInteger();
tokenStream
.onPartialResponse(this::onPartialResponse)
.onPartialThinking(this::onPartialThinking)
.onRetrieved(this::onContentRetrieved)
.onIntermediateResponse(this::onIntermediateResponse)
.onToolExecuted(this::onToolExecuted)
.onCompleteResponse(
(ChatResponse response) ->
publishCompleteResponse(
userMessage,
response,
sequence.getAndIncrement()))
.onError((Throwable error) -> onError(userMessage, error))
.start();
}
...
private void publishCompleteResponse(
Message<UserRequest> userMessage,
ChatResponse response,
int sequenceNumber) {
messageBus.respond(
userMessage.id(),
Message.responseTo(
userMessage,
new AgentResponse(
response.aiMessage().text(),
ZonedDateTime.now(),
true,
new SequenceNumber(sequenceNumber))));
}
Minute Taker Agent
The Minute Taker Agent is very similar in its structure to the Local System Agent. Instead of answering user request, this agent listens for the ConversationEnded
system event and creates a JSON file with information about it:
@Override
public void onSystemMessage(Message<? extends SystemEvent> message) {
if (message.payload() instanceof ConversationEnded conversationEnded) {
onConversationEnded(conversationEnded);
}
}
private void onConversationEnded(ConversationEnded conversationEnded) {
var conversation =
memory.getConversation(
new ConversationId(conversationEnded.conversationId()));
if (conversation.getMessages().isEmpty()) {
logger.error(
"Conversation {} does not contain any message. No summary to generate!",
conversationEnded.conversationId());
return;
}
var text =
conversation.getMessages().stream()
.filter(this::includeMessageInMinute)
.map(this::buildStringView)
.collect(Collectors.joining("\n"));
getAssistant()
.chat(text)
.onCompleteResponse(response ->
saveMinute(response.aiMessage().text(), conversation))
.onPartialResponse(logger::debug)
.onError(it -> logger.error(it.getMessage(), it))
.start();
}
In this case, the prompt is hard-coded in the AIService interface:
public interface AIAssistant {
@UserMessage(
"""
# Identity
You are an assistant specialized in generating minutes from conversations between the user and other agents. Your
specialty is to create a document describing a conversation.
# Instructions
Generate minutes in a narrative style, using sentences starting with "The user asked ", or "The agent response was ".
You MUST include in the minute EVERY MESSAGE in the conversation. All the user messages and the agent responses.
Respond using the following JSON format:
{
"actors": an array with the names of the participants,
"summary": a one-sentence summary, what the conversation was about,
"messages": an array with the original messages in the conversation
}
DON'T include any additional text, just the JSON.
Generate minute for the following conversation:
----
{{conversation}}
----
""")
TokenStream chat(@V("conversation") String conversation);
}
The CLI application
The amica-cli application is very simple, just a Micronaut CLI application (based on picocli) that uses the MessageBus
to publish user requests and listen for the response messages:
@SerdeImport(UserRequest.class)
@SerdeImport(AgentResponse.class)
@SerdeImport(SequenceNumber.class)
@Command(name = "cli", description = "...", mixinStandardHelpOptions = true)
public class CliCommand implements Runnable {
@Inject MessageBus messageBus;
@Option(
names = {"-v", "--verbose"},
description = "...")
boolean verbose;
public static void main(String[] args) throws Exception {
PicocliRunner.run(CliCommand.class, args);
}
public void run() {
var console = System.console();
var userInput = console.readLine();
var userName = System.getProperty("user.name");
try (var writer = console.writer()) {
while (!"exit".equals(userInput)) {
if (!userInput.isBlank()) {
var userMessage =
Message.userRequest(
new UserRequest(ZonedDateTime.now(ZoneOffset.UTC), userName, userInput));
messageBus.send(
userMessage,
response -> {
writer.print(response.payload().message());
if (response.payload().isComplete()) {
writer.println("\n---");
}
writer.flush();
});
}
userInput = console.readLine();
}
}
System.exit(0);
}
}
How to run A.M.I.C.A
Now that we have covered all the moving parts, let’s see the platform in action. Check out the project to your local environment, and follow these steps:
1. Start the services:
docker compose up -d
This will start two containers:
- A mosquitto MQTT message broker that will be used as Message Bus.
- A Valkey cache that will be used as a shared memory.
2. Compile the Java SDK project:
cd amica-sdk-java
mvn clean install
3. Start the services (each one in a different terminal):
cd agents/local-system-agent
mvn mn:run
cd agents/minute-taker-agent
mvn mn:run
4. Start the CLI (in a new terminal)
cd amica-cli
mvn mn:run
Once in the CLI application, you can send queries, for example:
> How many users are currently logged into this system?
The local-system-agent
will use the UserCount tool to get the number of users currently logged in, and will respond with something like this:
There are currently 2 users logged into this system.
You will also see how the the minute-taker-agent
detects a new conversation and generates minutes for it:
10:13:55.198 [RxComputationThreadPool-1] INFO e.n.a.a.m.i.l.agent.MinuteTakerAgent - Saving minute for conversation ebc5d308-8fba-4a61-af09-913dd0766eb8 to ...
10:13:55.198 [RxComputationThreadPool-1] INFO e.n.a.a.m.i.l.agent.MinuteTakerAgent -
{
"actors": ["nacho", "local-system-agent"],
"summary": "The user asked how many users are currently logged into this system.",
"messages": [
"The user asked How many users are currently logged into this system",
"The agent response was There are currently 2 users logged into this system."
]
}
Conclusion
As I mentioned at the beginning of the article, A.M.I.C.A is a proof of concept, a project created not to be production-ready, but to demonstrate a set of principles. My goal was to explore how to build a multi-agent, asynchronous system where agents built with different technologies can interact and react to events. I hope you found this article useful in your own journey.
And, of course, the A.M.I.C.A.
name was proposed by ChatGPT :-)