Skip to main content
Version: edge

The tcp Connector

The tcp connector allows TCP-based clients and servers to be integrated with tremor.

Configuration

Client

config.troy
define connector `tcp-out` from tcp_client
with
codec = "yaml",
postprocessors = ["base64"],
config = {
"url": "localhost:4242",

# Optional Transport Level Security configuration
# "tls" = { ... },

# Optional tuning of the Nagle algorithm ( default: true )
# - By default no delay is preferred
# "no_delay" = false,

# Data buffer size ( default: 8K, limits maximum message size )
# "buf_size" = "16K",
}
end;

Server

config.troy
define connector `tcp-in` from tcp_server
with
codec = "string",
config = {
"url": "localhost:4242",

# Optional Transport Level Security configuration
# "tls" = { ... }

# Data buffer size ( default: 8K, limits maximum message size )
# "buf_size" = "16K",
}
end;

TCP configuration example

This is a relatively simple client server system that replays JSON formatted lines of data from a text file over TCP to a server. The JSON data is transformed to base64 encoded YAML before sending to the TCP server. The server receives TCP datagrams and writes them to a log file encoding the log entries as naked string data.

The client and server are implemented as treomr flows.

A high level summary of the overall flow:

read line by line
base64 encoded YAML
receive TCP datagrams
string encoded data
JSON File
TCP Client
TCP Server
select event from in into out
Log file

JSON file over TCP as base64 encoded YAML

A tremor flow that consumes JSON formatted data line by line from a file and sends each event as a base64 encoded YAML formatted event via TCP to a TCP service.

High level flow summary:

read line by line
base64 encoded YAML
JSON File
TCP Client
TCP Server

In deployable form

config.troy
define flow main
flow
use tremor::connectors;
use integration;


define connector in from file
with codec = "json",
preprocessors = ["lines"],
config = {
"path": "in.json",
"mode": "read"
}
end;

define connector `tcp-out` from tcp_client
with
codec = "yaml",
postprocessors = ["base64"],
config = {
"url": "localhost:4242",
}
end;

create pipeline main from integration::out_or_exit;
create connector in;
create connector `tcp-out`;
create connector exit from connectors::exit;

connect /connector/in/out to /pipeline/main/in;
connect /pipeline/main/out to /connector/`tcp-out`/in;
connect /pipeline/main/exit to /connector/exit/in;
end;

deploy flow main;

TCP server that logs base64 encoded YAML as one string per line to a file

A tremor flow that consumes TCP datagram events and writes them to a file encoding as raw strings.

High level flow summary:

receive TCP datagrams
string encoded data
TCP Server
select event from in into out
Log file
config.troy
define flow server
flow
use integration;
use tremor::pipelines;
use tremor::connectors;

define connector tcp_server from tcp_server
with
preprocessors = ["lines"],
postprocessors = ["lines"],
codec = "json",
config = {
"url": "0.0.0.0:65535",
"buf_size": 1024,
}
end;

create connector server_out from integration::write_file
with
file = "server_out.log"
end;
create connector stdio from connectors::console;
create connector tcp_server;

create pipeline server_side from pipelines::passthrough;
create pipeline debug from pipelines::passthrough;

# flow from tcp_server to file
connect /connector/tcp_server to /pipeline/server_side;
connect /connector/tcp_server/err to /pipeline/debug;
connect /pipeline/server_side to /connector/server_out;

# aaaand echo it back
connect /pipeline/server_side to /connector/tcp_server;

# debugging
connect /pipeline/debug to /connector/stdio;
end;

define flow client
flow
use integration;
use tremor::pipelines;
use tremor::connectors;

define connector tcp_client from tcp_client
with
preprocessors = ["lines"],
postprocessors = ["lines"],
codec = "json",
config = {
"url": "127.0.0.1:65535",
"no_delay": false,
"buf_size": 1024,
},
reconnect = {
"retry": {
"interval_ms": 100,
"growth_rate": 2,
"max_retries": 3,
}
}
end;

create connector in from integration::read_file;
create connector client_out from integration::write_file
with
file = "client_out.log"
end;
create connector exit from integration::exit;
create connector stdio from connectors::console;
create connector tcp_client;

create pipeline to_client from pipelines::passthrough;
create pipeline from_client from integration::out_or_exit;
create pipeline debug from pipelines::passthrough;

connect /connector/in to /pipeline/to_client;
connect /connector/in/err to /pipeline/debug;
connect /pipeline/to_client to /connector/tcp_client;

# send out any responses to the client to file `client_out.log`
connect /connector/tcp_client to /pipeline/from_client;
connect /connector/tcp_client/err to /pipeline/debug;
connect /pipeline/from_client/out to /connector/client_out;
connect /pipeline/from_client/exit to /connector/exit;

# debugging
connect /pipeline/debug to /connector/stdio;

end;

deploy flow server;
deploy flow client;

Notes

Running as an integration test

This is how we run this test sceanario within our integration test suite.

$ export TREMOR_PATH=/path/to/tremor-runtime/tremor-script/lib:/path/to/tremor-runtime/tremor-cli/tests/lib
$ tremor test integration .

Running as long running service

The logic can be used as starting point for your own client or service via tremor server run.

$ export TREMOR_PATH=/path/to/tremor-runtime/tremor-script/lib:/path/to/tremor-runtime/tremor-cli/tests/lib
$ tremor server run config.troy

Running as a long running service, with pretty printed JSON output

During development, pretty printing the JSON output on standard output might be useful.

We typically use the wonderful jq for this purpose

$ export TREMOR_PATH=/path/to/tremor-runtime/tremor-script/lib:/path/to/tremor-runtime/tremor-cli/tests/lib
$ tremor server run config.troy | jq

Exercises

  • Modularise the solution allowing the following combinations
    • Deploy the client and server in separate tremor instances
    • Deploy the client and server in a single tremor instance
    • Use the msgpack codec instead of base64 encoded YAML
    • Unpack the base64 encoded YAML and write to a log file as line-delimited JSON
    • Add compression and decmopression