More work fixing bugs?

Vibe coding is the new AI thing advertised on social media. Unsurprisingly we have all the expected marketing material, the influencer posts, the drama of security issues and some pushback.

Frustrated incident manager, engineer or entrepreneur? You choose!

Obviously the problem with AI generated code is the bugs introduced, the security issues, the outdated code used on training etc. We all know that we know that it won’t get away.
So did everyone lost their minds and wagger the ai slot machine because its code sometimes can be ok?
Of course not.
Organisations avoid pushing unvetted code on production whether it is AI generated or human created. Every deployment that goes south comes along with incidents, post-mortems, claims, regulatory issues and even legal fees. There is no such thing as pushing code to prod lightheartedly whether it comes from an intern, a dev agency or some LLM.

And while the above stands correct it’s not preventing apps, developed with the help of LLMs, to get  deployed out there. LLMs are used and will continue to be used. A big part of the Vibe Coding bragging comes from apps that solo entrepreneurs develop/try to develop. This will be/is a huge chapter of ai-generated code.

First will have to exclude a subset of greenfield software that get’s developed . For example you might use AI to create a data warehouse solution from scratch and utilise LSM indexes. Or you might want to apply the Raft consensus algorithm to a solution you created. If you operate at that level, you are already specialized in a field and you are sophisticated enough to know better. Even with AI based assistance it’s highly unlikely to deliver a dud. Proper checks regarding quality and security would be in place, thus future maintenance opportunities due to AI bugs are not there.

Reddit is actually a very nice place where you can see some real life stories of individuals  embarking on creating their own software business. Me and most readers of this blog probably have a degree in engineering and software is our profession, yet it’s not that often that we get to see that reality. There are many individuals with zero relation to coding let alone software engineering trying to get in the business of software. It all starts with an idea and then all the steps needed to make it into a software business. Usually this is resource intensive since it needs to pay someone to do the coding. The funds can come either in the form of angel investment, personal finances and maybe relative’s finances. Once the capital is secured a developer/dev agency is hired to build the MVP, MVP is rolled out, demos are successful, customers love the product, hooray what a success!

Too optimistic right?

Now take a look at the cemetery. It is quite difficult to do so because people who fail do not seem to write memoirs, and, if they did, those business publishers I know would not even consider giving them the courtesy of a returned phone call (as to returned e-mail, fuhgedit). Readers would not pay $26.95 for a story of failure, even if you convinced them that it had more useful tricks than a story of success.* The entire notion of biography is grounded in the arbitrary ascription of a causal relation between specified traits and subsequent events. Now consider the cemetery. The graveyard of failed persons will be full of people who shared the following traits: courage, risk taking, optimism, et cetera. Just like the population of millionaires. There may be some differences in skills, but what truly separates the two is for the most part a single factor: luck. Plain luck.

Taleb. Black Swan

Obviously we don’t get to know the actual failure rate of the attempts to turn an idea into a software business. However we do have an idea how expensive it is to fail when you embark on making a software business, and a big contributor to that is the cost of developers. On top of that there is also the hidden cost of the time someone spends on their idea preventing them from working/gaining experience on other activities.

This makes software  a longshot, an expensive longshot.
The odds are high but the costs are considerable. From that perspective AI tooling makes total sense. Since the odds are high at least you need to keep the costs low.

If you think about it, the highly likely scenario of failure removes the needs for any maintenance/bug fixing :

  • New app is developed using AI assistance
  • New app fails
  • Funding of failed app is ceased
  • Failed app and its codebase is declared dead
  • No furthers funds spend on a dead project

There is still the scenario of a successful MVP. Provided your initial attempts of rolling out your idea are successful fixing AI bugs is a privilege.  It’s tempting to think that these successes will lead to more software in need of maintenance.

I believe it won’t be the case. If it’s easy for everyone to create software, then even on a successful MVP launch you still have lot’s of competition to deal with, thus more effort for less reward. Bad ratio of risk vs reward will prevent a product from being developed in the first place. At least for the products we’ve seen showcased using Vibe coding.

Equilibrium?

Do I believe engineers will have more work due to bugs? Not really. I do think there’s gonna be some AI inflicted pain but not as bad as mentioned. Indeed software could be developed with less resources but the easiness of doing so makes the implementation questionable. If anyone can do, it is it worth to develop that idea in the first place? Regardless attempts to make ideas into software will be there, should these attempts succeed, engineers will be there to help.

 

OpenCode, LLM studio and Qwen code

Just like many devs I’ve been into my LLM honeymoon with Claude code. So far things are smooth with some hiccups like rate limiting and sometimes lag. The thing is you don’t need to always use the model with the most parameters, there are many options out there for the day to day tasks. Also Claude code might be great and can work with multiple models, including the open source ones, but it is still a closed source product.

And this is where OpenCode, Qwen Code and LM Studio kick in.

 

I am pretty fascinated by local LLMs. A GPU with a 24gb ram can take you very far. You can run some local models with pretty quality results. No restrictions apart from the GPU’s limits. There are also benefits like the fast responses and the fact that the data stay with you.

My initial attempt was with Ollama yet the combination was not very encouraging thus I went for LM studio.

I run local models at my personal server, thus had to make some tweaks on my server.

First step is installing the no-gui distribution. Can be done on a user’s home directory

curl -fsSL https://lmstudio.ai/install.sh | bash

Since LM studio is running we can use `qwen3-coder-30b`. It is a good model and has tools support.
What is tools capabilities in a model? Tools is the capability of a LLM to interact with external systems, APIs, or software to perform tasks beyond simple text generation.

Let’s load the model

lms get qwen/qwen3-coder-30b
lms load qwen/qwen3-coder-30b
lms chat qwen/qwen3-coder-30b

Once we see the prompt it means our model is downloaded and ready to be used.

Now we make the systemd configuration, this way if our server restarts lm studio with the model loaded will be there for us.

[Unit]
Description=LM Studio Server
[Service]
Type=oneshot
RemainAfterExit=yes
User=gkatzioura
Environment="HOME=/home/username"
ExecStartPre=/home/username/.lmstudio/bin/lms daemon up
ExecStartPre=/home/username/.lmstudio/bin/lms load qwen/qwen3-coder-30b --yes
ExecStart=/home/username/.lmstudio/bin/lms server start --bind 0.0.0.0
ExecStop=/home/username/.lmstudio/bin/lms daemon down
[Install]
WantedBy=multi-user.target

username is the placeholder for the user on your linux workstation

We now have all the routine systemd commands

sudo systemctl daemon-reload
sudo systemctl enable lmstudio.service
sudo systemctl start lmstudio.service

We can test if the server is reachable with a curl request

curl http://192.168.1.171:1234/v1/models

Also take note we bind the server to 0.0.0.0, thus will be accessible to our local network.
Our server is setup, now it’s time to set it with OpenCode.
OpenCode is an open source ai agent. It has many functionalities to Claude code.

At the root of the project we can create the file opencode.json containing the following configuration. This way we point to our server running the models.

{
  "$schema": "https://opencode.ai/config.json",
  "permission": {
    "read": "allow",
    "edit": "ask",
    "bash": "ask",
    "postgresql*": "ask"
  },
  "provider": {
    "qwen": {
      "npm": "@ai-sdk/openai-compatible",
      "options": {
        "baseURL": "http://192.168.1.171:1234/v1"
      },
      "models": {
        "qwen3-coder-30b": {
          "id": "qwen/qwen3-coder-30b",
          "reasoning": true,
          "tool_call": true
        }
      }
    }
  }
}

Agents MD

Onwards we can create AGENTS.md files
For example

# local-llm-test project

This is a maven based java project

## Project Structure

- `src/main/java/com/egkatzioura/ai` - source code files

Skills

Skills are also supported, you can put the skills files under the folder .opencode

Example of a skill file .opencode/skills/immutable-java-object/SKILL.md

---
name: immutable-java-object
description: Create an immutable java object
license: MIT
compatibility: opencode
---

Create a Java immutable object using the class name provided and the fields
package is at ./src/main/java/com/egkatzioura/ai/models
use lombok toBuilder=true

MCP

Last but not least we can put mcp configurations at the opencode.json file.

{
...
  "mcp": {
    "postgresql": {
      "type": "remote",
      "url": "http://127.0.0.1:8000/sse",
      "enabled": true
    }
  },
...
}

That’s it we went full local and open source. So far pretty satisfied from the whole experience, how fast it was as well as the results.

So much magic flowing from that GPU.
So much magic flowing from that GPU.

 

 

Lombok ExtensionMethod or else Fight fire with Fire

I love rich models, I enjoy having something cohesive that can be re-used and have the much needed logic at the right place.

But we don’t live in a perfect world: Introducing autogeneration.
You got jooq, then you get swagger generated models. For sure you can plug some custom code but that can be cumbersome.

If you’ve been into Scala, like I used to, you know that you add a method to an existing type using implicit classes.
And that’s what I wished for when I stumbled on generated models that I needed to add some logic.

 

 

This is where Lombok’s ExtensionMethod comes to the rescue.

First things first the lombok binary is needed

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.24</version>
    <scope>provided</scope>
</dependency>

Now assuming we have a class imported from a library

package com.egkatzioura.external.models;

public class CustomModel {

...
}

Assuming the class resides on an imported jar we cannot add any method to it.
We shall create a class with a static method whose input is the model of interest.

package com.egkatzioura.utils;

import lombok.experimental.ExtensionMethod;

public class CustomModelUtil {

  public static void youCustomMethod(CustomModel customModel) {
    System.out.println("custom method called");
  }
}

So far we did not do anything with lombok, likely you might wonder how do we apply it.
Essentially you need to apply the annotation to each class you want the method to be used as if it was part of the original class specification.

package com.egkatzioura.models;

import lombok.experimental.ExtensionMethod;
import com.egkatzioura.utils.CustomModelUtil;

@ExtensionMethod({CustomModelUtil.class})
public class Application {

  public static void main(String[] args) {
    CustomModel customModel = new CustomModel();
    customModel.whatever();
  }
}

Now you can have that rich model you craved for 😉 But hold a sec, we just fixed an issue introduced by generation with generation :/.
Well wether this is good or bad will leave it up to you.

PostgreSQL auditing

You know from my blogs that I am a PostgreSQL addict. If there is one thing that made my DBA days easier, was the mighty PGAudit extension. With pgaudit we can have detailed session and/or object audit logging.

Could expand more on why audit logs are essential under certain circumstances like government, financial, or ISO certification audits but my focus would be more on operations.

Imagine wanting to perform certain database migrations. You can switch traffic to a read replica which you shall promote, but this has the drawback of failed writes. How could you identify which time of day statistically has mainly reads and little or no writes? This is where the aggregation of the PGAudit logs would give you the answer.

Another example is cpu spikes building up during the day and you want to check the query patterns and see any correlation.

Furtermore you want to have a list of read queries that could be cached. By having PGAudit enabled you can identify the frequency of those queries and then decide how you can maximize the caching impact by picking the right queries.

Could go on and on. Overall PGAudit can do wonders.

 

 

So let’s get started.

We need to have a PostgreSQL installation with the extension enabled.
Debian already has a package for PGAudit available. In other cases you need to build the package on your own. Instructions can be found on the official guide.

We use a debian based docker image thus apt-get will do the work

FROM postgres:17
USER root
RUN apt-get update; apt-get install postgresql-17-pgaudit -y
USER postgres

Since PGAudit is installed we can create a custom postgresql configuration enabling it.

listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
max_wal_senders = 3
shared_preload_libraries = 'pgaudit'

pgaudit.log = 'all'  
pgaudit.log_catalog = 'off'
pgaudit.log_parameter = 'off'
pgaudit.log_statement_once = 'on'

Let’s put them all together into a docker compose file

version: '3.1'
 
services:
  postgres:
    build: ./
    restart: always
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    volumes:
      - ./postgresql.conf:/etc/postgresql/postgresql.conf
    command:
      - "-c"
      - "config_file=/etc/postgresql/postgresql.conf"
    ports:
      - 5432:5432

To run issue

 

docker compose up

Docker Compose V2 is out there with many good features, you can find more about it on the book I authored:
A Developer’s Essential Guide to Docker Compose
.

No we can execute some queries and see how they are logged.

$ docker compose exec -it postgres bash
# psql
postgres=# SELECT 1;
 ?column? 
----------
        1
(1 row)

If we check the container logs, we shall see an audit trail of the queries we executed.

postgres-1  | 2025-09-18 23:33:05.166 GMT [271] LOG:  AUDIT: SESSION,1,1,READ,SELECT,,,SELECT 1,&lt;not logged&gt;

Now this looks amazing however there are certain things to take into consideration.

Costs

Audit logs are expensive because of their volume. Before enabling them on production you need to make sure they are stored in a cost effective form of storage.

Resources

They do consume resources from your PostgreSQL instance. Size up your resources carefully.

Caution on Parameters

Do not log the parameters and if you do, understand that you log the actual data contained in the database. Logging parameters and storing audit logs in a widely access storage likely results in a data leak. Avoid logging the parameters. If you do so have a good reason and also a proper place to store them that satisfies the security needs.

Prevent Data Leaks

If your application is written badly and the queries do not use parameterized statements the audit logs will lead to dataleaks. All queries should be parameterized and should avoid any hardcoded string inside.

We have this table

CREATE TABLE table_name (
    id SERIAL PRIMARY KEY,
    sensitive_field TEXT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

This is a bad query


SELECT *FROM table_name WHERE sensitive_field='data-leak';

Leads to leaking the sensitive field value to the terminal


postgres-1 | 2025-09-18 23:44:53.953 GMT [271] STATEMENT: SELECT *FROM table_name WHERE sensitive_field='data-leak';

Instead you can use a parameterized query, just like the following jdbc example

             PreparedStatement stmt = conn.prepareStatement("SELECT * FROM table_name WHERE sensitive_field = ?")) {

            // Parameterized query - safe from SQL injection
            stmt.setString(1, "data-leak");

In the logs the information is not leaked.


postgres-1  | 2025-09-19 07:20:39.531 GMT [63] LOG:  AUDIT: SESSION,3,1,READ,SELECT,,,SELECT * FROM table_name WHERE sensitive_field = $1,&lt;not logged&gt;

 

That’s it. Let’s enjoy PostgreSQL that amazing piece of technology and handle with care.

Spring Conditionals

Spring is a powerful framework which serves billion of requests worldwide every minute. One of the things that made it special was its Dependency injection capabilities. At the start of the application Spring scans for the classes in the classpath, identifies the configuration classes as well as the classes containing bean related annotations. Conditionals have a  pivotal role to the environment creation.

There are many reasons why you need to use Conditionals.

Overall think about your modular application. Your application has to operate on various different environments. You got Development, Staging, UAT, Production etc. Your code should be as close to production as it can be, yet there can always be some variations.

What if your application has to work on multiple clouds? In that case a broker class might need different implementations. For example on AWS you want to dispatch messages using SQS, on Azure you shall do so using storage Queues.

Usually we do this by creating an interface specifying the functionality we want (plain old strategy pattern)

public interface MessagePublisher {
    
    void publish(String message);
    
}

SQS implementation

public class SqsMessagePublisher implements MessagePublisher {
    
    public void publish(String message) {
    ...  
    }
    
}

Azure implementation

public class AzureStorageQueuePublisher implements MessagePublisher {
    
    public void publish(String message) {
    ...  
    }
    
}

You could define the implementation to use with a @Configuration bean that checks the defined properties.

@Configuration
public class MessagePublisherConfig {

    @Value("${message.publisher.type:sqs}")
    private String publisherType;

    @Bean
    public MessagePublisher messagePublisher() {
        if (publisherType != null &amp;amp;amp;amp;amp;amp;amp;amp;&amp;amp;amp;amp;amp;amp;amp;amp; publisherType.equalsIgnoreCase("azure")) {
            return new AzureStorageQueuePublisher();
        } else {
            // Default to SQS or handle other cases
            return new SqsMessagePublisher();
        }
    }
}

This code works however thanks to Spring, there is no need for that if statement.

Conditional on Property

Spring has this problem already shorted with conditional beans.
We shall change our classes by using he ConditionalOnProperty annotation.


@Component
@ConditionalOnProperty(name = "message.publisher.type", havingValue = "azure")
public class AzureStorageQueuePublisher implements MessagePublisher {
    public void publish(String message) {
    ...  
    }
}
@Component
@ConditionalOnProperty(name = "message.publisher.type", havingValue = "sqs")
public class SqsMessagePublisher implements MessagePublisher {
    
    public void publish(String message) {
    ...  
    }
    
}

 

Conditional on class

Now you might think that having both implementations in one jar is kinda bloated. It is likely that a lean jar is better. One jar built with the AWS dependencies and one jar built with the Azure dependencies. Beyond the capabilities of the built tool used (for example profiles on maven) our codebase should be able to handle any class loading issues.
There is a Conditional annotation based on the presence of classes.

@Component
@ConditionalOnClass(QueueServiceClient.class)
public class AzureStorageQueuePublisher implements MessagePublisher {

    public void publish(String message) {
    }

}
@Component
@ConditionalOnClass(SqsClient.class)
public class SqsMessagePublisher implements MessagePublisher {

    public void publish(String message) {
    }

}

Behind the scenes spring scans the class definition and identifies if the required class exists on the binary before proceeding on instantiation. The above option enables us to have a jar with less dependencies that will instantiate the right bean implementations based on the environment.

ConditionalOnMissingBean

Regardless of the environment we might want to spin up a default implementation of the MessagePublisher, in case certain criteria are not fulfilled. In that case the ConditionalOnMissingBean annotation can help.

@Component
@ConditionalOnMissingBean(MessagePublisher.class)
public class DefaultPublisher implements MessagePublisher {
    @Override
    public void publish(String message) {

    }
}

Simplify

As we can see Conditionals are powerful, yet they do bring a configuration overhead which can be error prone or cumbersome. Instead of using the same configuration all over again we can simplify it by defining a conditional annotation with the configurations preset.

@ConditionalOnProperty(name = "message.publisher.type", havingValue = "azure")
@Retention(RetentionPolicy.RUNTIME)
public @interface AzurePublisherEnabled {
}

On the above example we can use the AzurePublisherEnabled annotation for the Azure only implementation.

Customization

So far conditionals have a wide range of options and ways to simplify them but what if you want something more complex that the existing annotations cannot fulfil? In that case you can create your own conditions.The condition can be fulfilled based on information retrieved by the ConditionContext whether they are environment variables or other aspects of the running program.

package com.gkatzioura.broker;

import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;

public class LocalBrokerCondition implements Condition {
    
    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        //Logic
        return false;
    }
    
}

To use this conditional handler you just have to configure it in the annotation.

@Conditional(LocalBrokerCondition.class)
public class LocalPublisher implements MessagePublisher {

    @Override
    public void publish(String message) {

    }
}

Testing

So far so good, so what about testing? One option is to spin up a spring context using
@SpringBootTest and check if certain bean implementation have been instantiated. Another handy way is to use the ApplicationContextRunner

    @Test
    public void testShouldBeDisabled() {
        ApplicationContextRunner runner = new ApplicationContextRunner()
                .withConfiguration(UserConfigurations.of(AzureStorageQueuePublisher.class));
        runner.withPropertyValues("message.publisher.type=azure")
                .run(context -&amp;amp;amp;amp;amp;amp;amp;gt; assertThat(context.getBean(MessagePublisher.class)).isInstanceOf(AzureStorageQueuePublisher.class));
    }

This is very elegant and removes the need to create a complex spring environment for unit tests.

So that’s it about conditionals, pretty sure you are gonna stumble on them on most spring based open source projects! Happy hacking 😉

Comments matter

I am an advocate of reading the source code. It’s something that I am vocal on various tech discussions and part of my advise to the individuals I lead.

The source code is the source of truth. Thanks to reading the source code I was able to contribute to various open source projects. This made my daily life much easier on the projects I was already using in a day to day basics.

One of the things that struck me recently while browsing the Fluss codebase, was that apart from just reading the source code I was actually reading the comments and getting help from them.
Just like many engineers throughout their career I went through a snobbish phase towards comments. This could have been due to some recommendations from various books perceived the wrong way or some wrongly perceived industry consensus. You see you can be dogmatic and say that if a codebase is clean enough you don’t need comments.
And this is were it hit me.
The codebases I tinker with after hours, do have a clean codebase. They are diligently written so that contributors can be productive and deliver those incremental changes to the software.
Understanding a codebase is a challenge and as it happens with most challenges they end up becoming a game. The process becomes a treasure hunt with the reward of that sweet `Eureka` moment.
And this is were things can go wrong. Provided you are after a contribution reading the source code is a means to end. Obviously you can read a source code for educational purposes, I am focusing when the goal is an increment to an existing codebase.
And this is how I started appreciating those comments I encountered.

It is much easier to get into context

You can always see the use of off-heap memory and assume it is to remove the pressure on the GC due to a large dataset or due to performance, but this is a guess. A comment can help you sort this out. The code tells you the how but the comment can tell you the intention and the goal.

You also get to have something compact

Instead of an external wiki or a note on a README.md you get both things in one file. That helps you not switch context. Switching context breaks your flow and makes it harded when connecting the dots.

Comments make things more searchable

You can always go and type a keyword on GitHub and pray that a class or function is named accordingly. But this is very specific and you already assume how a codebase is. What if your search is based on context and a bigger picture? Comments due to their nature are flexible to encapsulate a part of the context and tell a story. From the tf-idf based search engines to the modern LLM tools a comment is there to feed them with context and give results.

Overall I learned to love comments after I realised I was an actual user of them. Still the occasional treasure hunts are here to stay but the real fun is on the goal. So do the next guy a favour, put a simple comprehensive, comment when you reckon is needed.

Asymmetric encryption using Java, GCP/AWS KMS

Public-key cryptography or asymmetric cryptography is something we use every daily, take for example the TLS on this very website.

From wikipedia

In a public-key encryption system, anyone with a public key can encrypt a message, yielding a ciphertext, but only those who know the corresponding private key can decrypt the ciphertext to obtain the original message.[8]
For example, a journalist can publish the public key of an encryption key pair on a web site so that sources can send secret messages to the news organization in ciphertext.

Generated image

Wikipedia gives a good example. Another example:
You have a process sending data from your premises to another datacenter. Apart from encryption in transit you want to go an extra step and encrypt the contents, ensuring that only certain workloads in the datacenter can decrypt the data.  This could be done with a symmetric key. The issue with a symmetric key is that both parties will have access to the same key, able to encrypt and decrypt information. Both parties might be uneasy to share the same key, plus we have an one direction flow: on premises datacenter to external datacenter.
This is where asymmetric encryption can be used. A public key encrypts the data before they get dispatched and the receiver of the data has the right private key to decrypt the data.

Let’s generate some RSA keys

#generate the private key
openssl genrsa -out keypair.pem 2048
#generate the public key
openssl rsa -in keypair.pem -pubout -out publickey.crt

We have a public key that we can use to encrypt data and a private key to use to decrypt the encrypted data.

The following java snippet reads the keys encrypts a string and decrypts it

import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.OAEPParameterSpec;
import javax.crypto.spec.PSource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.*;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.MGF1ParameterSpec;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

public class Main {

    public static void main(String[] args) throws IOException, NoSuchAlgorithmException, InvalidKeySpecException, NoSuchPaddingException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException, InvalidAlgorithmParameterException {
        final String privateKeyText;
        final String publicKeyText;

        try(InputStream privateKeyInputstream = new FileInputStream( "/path/to/keypair.pem");
            InputStream publicKeyInputStream = new FileInputStream("/path/to/publickey.crt");) {
            privateKeyText = new String(privateKeyInputstream.readAllBytes());
            publicKeyText = new String(publicKeyInputStream.readAllBytes());
        }

        String privateKeyBase64 = privateKeyText.replaceAll("\\n", "").replace("-----BEGIN PRIVATE KEY-----", "").replace("-----END PRIVATE KEY-----", "");
        String publicKeyBase64 = publicKeyText.replaceAll("\\n", "").replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "");

        byte[] privateKeyBytes = Base64.getDecoder().decode(privateKeyBase64);
        byte[] publicKeyBytes = Base64.getDecoder().decode(publicKeyBase64);

        KeyFactory keyFactory = KeyFactory.getInstance("RSA");

        PKCS8EncodedKeySpec privateKeySpec = new PKCS8EncodedKeySpec(privateKeyBytes);
        PrivateKey privateKey = keyFactory.generatePrivate(privateKeySpec);

        X509EncodedKeySpec publicKeySpec = new X509EncodedKeySpec(publicKeyBytes);
        PublicKey publicKey = keyFactory.generatePublic(publicKeySpec);

        String unEncryptedText = "un-encrypted text";
        byte[] message = unEncryptedText.getBytes(StandardCharsets.UTF_8);

        Cipher encryptCypher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
        OAEPParameterSpec oaepParams =
                new OAEPParameterSpec(
                        "SHA-256", "MGF1", MGF1ParameterSpec.SHA256, PSource.PSpecified.DEFAULT);
        encryptCypher.init(Cipher.ENCRYPT_MODE, publicKey, oaepParams);
        encryptCypher.update(message);

        byte[] ciphertext = encryptCypher.doFinal();

        Cipher cipher =  Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
        cipher.init(Cipher.DECRYPT_MODE, privateKey, oaepParams);
        cipher.update(ciphertext);

        byte[] decrypted = cipher.doFinal();
        String decryptedText = new String(decrypted);

        assert decryptedText.equals(unEncryptedText);
    }

}

To wrap up the above example we read the contents of the keys. The keys are stored in a Base64 format thus we decode them and we create the keys using the decoded bytes. Then using the cipher and the keys we encrypt and decrypt the payload.

On the public cloud we have KMS.

We can use the AWS or the GCP KMS to create a private-public key pair.

Let’s start with the GCP example first. Once we created an asymmetric KMS we can download the public key.

Should we want to decrypt any data it will only happen through the KMS api. The public key can be found through the console.

 

We shall place the key on the classpath with the name gcp.crt

We shall encrypt he data using the public key downloaded and decrypt them using the KMS API:

import com.google.cloud.kms.v1.*;
import com.google.protobuf.ByteString;

import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.OAEPParameterSpec;
import javax.crypto.spec.PSource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.*;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.MGF1ParameterSpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

public class GCPKMS {

    public static void main(String[] args) throws IOException, NoSuchAlgorithmException, InvalidKeySpecException, NoSuchPaddingException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException, InvalidAlgorithmParameterException {
        try (KeyManagementServiceClient client = KeyManagementServiceClient.create()) {
            String name = "full-gcp-key-name";

            String unEncryptedText = "un-encrypted text";

            InputStream publicKeyInputStream = Main.class.getClassLoader().getResourceAsStream("gcp.crt");
            String publicKeyText = new String(publicKeyInputStream.readAllBytes());
            String publicKeyBase64 = publicKeyText.replaceAll("\\n", "").replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "");
            byte[] publicKeyBytes = Base64.getDecoder().decode(publicKeyBase64);

            X509EncodedKeySpec keySpec = new X509EncodedKeySpec(publicKeyBytes);
            java.security.PublicKey publicKey = KeyFactory.getInstance("RSA").generatePublic(keySpec);

            Cipher encryptCypher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
            OAEPParameterSpec oaepParams =
                    new OAEPParameterSpec(
                            "SHA-256", "MGF1", MGF1ParameterSpec.SHA256, PSource.PSpecified.DEFAULT);
            encryptCypher.init(Cipher.ENCRYPT_MODE, publicKey, oaepParams);
            byte[] ciphertext = encryptCypher.doFinal(unEncryptedText.getBytes(StandardCharsets.UTF_8));


            AsymmetricDecryptResponse response = client.asymmetricDecrypt(AsymmetricDecryptRequest.newBuilder()
                    .setCiphertext(ByteString.copyFrom(ciphertext))
                    .setName(name)
                    .build());

            assert response.getPlaintext().toStringUtf8().equals(unEncryptedText);
        }
    }

}

Pretty similar with AWS KMS

Once we created the asymmetric key we can download the public key from console.

The key shall be placed on the classpath with the name `aws.crt`

We shall encrypt he data using the public key downloaded and decrypt them using the KMS API:

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.model.DecryptRequest;
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;

import javax.crypto.Cipher;
import javax.crypto.spec.OAEPParameterSpec;
import javax.crypto.spec.PSource;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.security.KeyFactory;
import java.security.spec.MGF1ParameterSpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

public class AWSKMS {

    public static void main(String[] args) throws Exception {
        String keyId = "kms-arn";

        KmsClient kmsClient = KmsClient.builder()
                .region(Region.of("us-central-1"))
                .credentialsProvider(DefaultCredentialsProvider.builder()
                        .build())
                .build();

        String unEncryptedText = "un-encrypted text";

        InputStream publicKeyInputStream = Main.class.getClassLoader().getResourceAsStream("aws.crt");
        String publicKeyText = new String(publicKeyInputStream.readAllBytes());
        String publicKeyBase64 = publicKeyText.replaceAll("\\n", "").replace("-----BEGIN PUBLIC KEY-----", "").replace("-----END PUBLIC KEY-----", "");
        byte[] publicKeyBytes = Base64.getDecoder().decode(publicKeyBase64);

        X509EncodedKeySpec keySpec = new X509EncodedKeySpec(publicKeyBytes);
        java.security.PublicKey publicKey = KeyFactory.getInstance("RSA").generatePublic(keySpec);

        Cipher encryptCypher = Cipher.getInstance("RSA/ECB/OAEPWithSHA-256AndMGF1Padding");
        OAEPParameterSpec oaepParams =
                new OAEPParameterSpec(
                        "SHA-256", "MGF1", MGF1ParameterSpec.SHA256, PSource.PSpecified.DEFAULT);
        encryptCypher.init(Cipher.ENCRYPT_MODE, publicKey, oaepParams);
        byte[] ciphertext = encryptCypher.doFinal(unEncryptedText.getBytes(StandardCharsets.UTF_8));

        var resp = kmsClient.decrypt(DecryptRequest.builder()
                        .keyId(keyId)
                        .encryptionAlgorithm(EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256)
                .ciphertextBlob(SdkBytes.fromByteArray(ciphertext))
                .build());

        assert resp.plaintext().asUtf8String().equals(unEncryptedText);
    }

}

That’s it! We did asymmetric encryption and decryption locally using the rsa keys we created. We did asymmetric encryption using a GCP KMS public key and decrypted using the GCP KMS api. Lastly we did asymmetric encryption using an AWS KMS public key and decrypted using the AWS KMS api.

Using the Google Cloud Platform Schema registry with Java

GCP’s Pub/Sub is one of my favourite services for streaming data. Seamless without getting into an operational overhead. Pub/Sub comes with GCP Schemas, which can be used to define the schema of the data flowing through the topics.

 

Interacting with the Pub/Sub Schemas is streamlined, you only have to include the Pub/Sub binary:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-pubsub</artifactId>
      <version>1.134.0</version>
    </dependency>

Then we can proceed on listing the topics:

    String projectId = "your-gcp-project-id";

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
      var response = schemaServiceClient.listSchemas(ProjectName.of(projectId));

      for (var s : response.iterateAll()) {
        System.out.println(s.getName()+ " "+s.getRevisionId());

        Schema schema = schemaServiceClient.getSchema(s.getName());
        System.out.println(schema.getDefinition());
      }

    }

Take note that when listing the topics you just fetch a listing of the names, the actual schema is not fetched even through the same model is used.

To fetch a schema’s definition you need to get the schema through a call.

Through the api you can also submit a schema:

      String projectId = "your-gcp-project-id";
      String schemaId = "example-schema";

      String schemaDefinition = "{ \"type\": \"record\", \"name\": \"TestRecord\", \"fields\": [ { \"name\": \"name\", \"type\": \"string\" } ] }";
      CreateSchemaRequest createSchemaRequest = CreateSchemaRequest.newBuilder()
          .setParent(projectId)
          .setSchema(Schema.newBuilder()
              .setName(SchemaName.of(projectId, schemaId).toString())
              .setType(Schema.Type.AVRO)
              .setDefinition(schemaDefinition)
              .build())
          .build();
      
      System.out.println("Schema created: " + createSchemaRequest.getSchema().getName());

Now you might be wondering what happens on Pub/Sub topics that are bound to a certain schema. In that case the request to publish the message will fail. The validation happens on the server side.

    Publisher publisher = Publisher.newBuilder(TopicName.parse("projects/test-project/topics/test-pub-sub")).build();
    var mes = publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFrom("invalid-format".getBytes())).build()).get();
...
Caused by: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
	

When you subscribe to topic with a schema attached any subscription message received has the metadata describing what schema was used to create the message.

MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          String name = message.getAttributesMap().get("googclient_schemaname");
          String revision = message.getAttributesMap().get("googclient_schemarevisionid");
        }
...
}

The schema name should be the full schema name including the project id, the revision contains the revision of the schema. In case of multiple revisions you can fetch the revision of interest by specifying it in the getSchema request. Now receiving the schema in a message is handy since it gives you the ability to have a dynamic handling of the various messages received by subscriptions.
For example you can read an avro message from a received subscription, by fetching the schema on demand.

MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
    String name = message.getAttributesMap().get("googclient_schemaname");


    Schema schema = schemaServiceClient.getSchema(SchemaName.parse(name));
    if(schema.getType().equals(Type.AVRO)) {
      String definition = schema.getDefinition();
      org.apache.avro.Schema avroSchema =new org.apache.avro.Schema.Parser().parse(definition);

      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>();

      try(InputStream inputStream = new ByteArrayInputStream(message.getData().toByteArray())) {
        BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);
        GenericRecord genericRecord = datumReader.read(null, decoder);
      }
    }

}
...
}

You might want your codebase to benefit further from the registry and even generate the corresponding classes for the various schemas. We have done that in a previous post.

In that case we can use the gcp-schemas-maven-plugin.

You can combine the plugin with a schema model generation plugin.
For example you can download Avro schemas from a GCP projects, and generate the Java classes from those Avro schemas.

      <plugin>
        <groupId>io.github.gkatzioura.gcp</groupId>
        <artifactId>gcp-schemas-maven-plugin</artifactId>
        <version>1.0</version>
        <executions>
          <execution>
            <id>one</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>download</goal>
            </goals>
            <configuration>
              <project>gcp-project-1</project>
              <outputDirectory>src/main/avro</outputDirectory>
              <schemaType>AVRO</schemaType>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.12.0</version>
        <executions>
          <execution>
            <phase>generate-sources</phase>
            <goals>
              <goal>schema</goal>
            </goals>
            <configuration>
              <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
              <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>

If you want to be selective on the Schemas to use you can even specify exactly the schema name or the version

      <plugin>
        <groupId>io.github.gkatzioura.gcp</groupId>
        <artifactId>gcp-schemas-maven-plugin</artifactId>
        <version>1.0</version>
        <executions>
          <execution>
            <id>one</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>download</goal>
            </goals>
            <configuration>
              <project>gcp-project-1</project>
              <outputDirectory>src/main/avro</outputDirectory>
              <schemaType>AVRO</schemaType>
              <subjectPatterns>schema1,schema2,schema3</subjectPatterns>
              <versions>113aca6b,69965687,e6dc8d46</versions>              
            </configuration>
          </execution>
        </executions>
      </plugin>

That’s it, hope this streamlines your data pipelines development!

Flinking Out: Getting started

I’ve been using Flink for some time and I started to prefer it over GCP Dataflow. There are various reasons for that and will get into details in future blogs.

Regardless I like the easiness on deploying it to Kuberenetes, as well as running it in various modes (streaming vs batching). It’s great to such a powerful framework which can be applied to a variety of cases.

 

Many Flink tutorials focus on a no code approach mainly driven by SQL. I would like to do something different. For example making your own Sink and Sources, as well as a more Java focused approach.

It all starts with our initial project which is what this blog is about.

I will use maven on this one and take advantage of the maven archetype:

mvn archetype:generate                                 \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.20.0                        \
      -DgroupId=com.egkatzioura.flink                  \
      -DartifactId=flinking-around                     \
      -Dversion=1.0.0-SNAPSHOT
    

After running this, we get a simple class that runs a job.

package com.egkatzioura.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DataStreamJob {

	public static void main(String[] args) throws Exception {
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		env.execute("Flink Java API Skeleton");
	}
}

Will enhance this example by emitting some integers and filtering out the ones that exceed a threshold. I will create a filter function. Given a threshold the function will filter the elements that exceed that threshold.

I will start with a test, therefore I have to update some of the dependencies.

...
	<dependencies>
		<dependency>
			<groupId>org.junit.jupiter</groupId>
			<artifactId>junit-jupiter-engine</artifactId>
			<version>5.9.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-test-utils</artifactId>
			<version>${flink.version}</version>
			<scope>test</scope>
		</dependency>
...
	</dependencies>

Onwards we shall implement our test class. We will start with a failed tests which we should make it into a working test by implementing the functionality missing.

package com.egkatzioura.flink;

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.collect.Lists;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

class FilterAboveThresholdTest {

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(2)
                            .setNumberTaskManagers(1)
                            .build());

    @Test
    void filterAboveTen() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        var dataStream = env.fromData(1,12,3,4,15,6);

        try (CloseableIterator&lt;Integer&gt; iterator = dataStream.executeAndCollect()) {
            List&lt;Integer&gt; results = Lists.newArrayList(iterator);
            assertThat(results)
                    .containsExactlyInAnyOrder(1,3,4,6);
        }
    }

}

As expected our test fails thus we need to implement the filter.

package com.egkatzioura.flink;

import org.apache.flink.api.common.functions.FilterFunction;

public class RemoveAboveThreshold implements FilterFunction&lt;Integer&gt; {

    private final Integer threshold;

    public RemoveAboveThreshold(Integer threshold) {
        this.threshold = threshold;
    }

    @Override
    public boolean filter(Integer integer) throws Exception {
        return integer&lt;threshold;
    }

}

Now let’s integrate the filter with our test.

package com.egkatzioura.flink;

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.CloseableIterator;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.collect.Lists;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

class RemoveAboveThresholdTest {

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(2)
                            .setNumberTaskManagers(1)
                            .build());

    @Test
    void filterAboveTen() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        var dataStream = env.fromData(1,12,3,4,15,6);

        RemoveAboveThreshold removeAboveThreshold = new RemoveAboveThreshold(10);
        try (CloseableIterator&lt;Integer&gt; iterator = dataStream.filter(removeAboveThreshold).executeAndCollect()) {
            List&lt;Integer&gt; results = Lists.newArrayList(iterator);
            assertThat(results)
                    .containsExactlyInAnyOrder(1,3,4,6);
        }
    }

}

That’s it, we had our first Flink get-started project with Flink and followed a TDD approach.

Java Concurrency: HttpUrlConnection on Virtual Threads

Virtual threads have been around for a while. Overall the talk of the town is how Asynchronous Applications will benefit from Virtual Threads.
Why Asynchronous is such a big deal? Mainly because blocking code is a pain. You keep the thread busy waiting for a response.
What is Asynchronous IO?
I would go for a full explanation in other blogs, but here’s a TL;DR.
The network calls are dispatched from your Java app to the native networking implementation through system calls. Bytes are exchanged and results are sent back to the application. While this is being done you need to block to receive the results.

Take for example an http request, you shall block for a response until all the interactions that go through the wire are finished, including creating the socket, establishing the connection, sending the request and receiving the response.

There is an alternative: you can execute the network based requests and poll for the results of the network interactions once they are complete. Overall your codebase for network calls will be a set of chained callbacks.

Java has NIO, the networking api that takes advantage of this functionality. Netty is a Library that uses NIO behind the scenes and reactive applications are based on it.

Virtual threads are run by a pool of native threads. On a network call the native threads can park the virtual thread and resume it once the network interaction is complete. Java has certain blocking networking apis that when they run in a virtual thread they are configured in a non-blocking mode.

An example is the HttpUrlConnection.

This code block runs on the main Thread, thus a native Thread:

    public static void main(String[] args) {
        try {
            var connection = (HttpURLConnection) new URL("https://google.com").openConnection();
            connection.setRequestMethod("GET");
            connection.connect();

            byte[] bytes = connection.getInputStream().readAllBytes();

            System.out.println(new String(bytes));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

It is way different behind the scenes than this block, running in a virtual Thread:

Thread vThread = Thread.ofVirtual().start(() -> {
            try {
                var connection = (HttpURLConnection) new URL("https://google.com").openConnection();
                connection.setRequestMethod("GET");
                connection.connect();

                byte[] bytes = connection.getInputStream().readAllBytes();

                System.out.println(new String(bytes));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });

        vThread.join();

As you know from previous blogs I have the tendency to dive deeper.

With some digging we end up to the creation of the NioSocketImpl. This is where the check is made whether the call is executed in a Virtual Thread and therefore make the call non blocking.

    private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
        throws IOException
    {
        if (!nonBlocking
            && (timed || Thread.currentThread().isVirtual())) {
            assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
            IOUtil.configureBlocking(fd, false);
            nonBlocking = true;
        }
    }

NIO has a notification mechanism the Poller, which continuously checks for any complete I/O interactions.

    private void pollerLoop() {
        try {
            for (;;) {
                poll(-1);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Provided an update was received the virtual thread will be un-parked and continue where it was left.

    @Override
    void unpark() {
        if (!getAndSetParkPermit(true) && currentThread() != this) {
            int s = state();

            // unparked while parked
            if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) {
                submitRunContinuation();
                return;
            }
...

On the next I/O call the Virtual Thread will be parked.

    private void park(FileDescriptor fd, int event, long nanos) throws IOException {
        Thread t = Thread.currentThread();
        if (t.isVirtual()) {
            Poller.poll(fdVal(fd), event, nanos, this::isOpen);
            if (t.isInterrupted()) {
                throw new InterruptedIOException();

So think how this plays out:

  • Code blocks inside virtual threads are executed by native threads
  • When an I/O call is initiated the network apis will make it into a non blocking one, since it is run under Virtual Threads
  •  Instead of blocking and wait for the I/O call to finish the Virtual Thread will be parked
  • The thread used to run the Virtual Thread will continue on other operations
  • Once the I/O is finished the Poller will pick up the event and unpark the Virtual Thread.
  • The native thread will resume executing the code block in the virtual Thread
...
() -> {
    try {
       var connection = (HttpURLConnection) new URL("https://google.com").openConnection(); //
       connection.setRequestMethod("GET");
       connection.connect(); //Open NIO socket, set as non blocking and park Virtual Thread while waiting for connection to establish

       byte[] bytes = connection.getInputStream().readAllBytes(); //Poller un-parked Virtual Thread, proceed to the next network call and park Virtual Thread once more.

       System.out.println(new String(bytes));// Call finished Virtual Thread is un-parked. 
    } catch (IOException e) {
       throw new RuntimeException(e);
    }
}
...

The way the mechanism described is wrapped together, gives the sense that it is sequential just like the code above. Behind the scenes it’s a combination of the NIO apis and the native threads that the Virtual Threads are executed upon.