13#include "../asrtl/asrt_assert.h"
14#include "../asrtl/log.h"
15#include "../asrtl/util.h"
16#include "../asrtlpp/callback.hpp"
17#include "../asrtlpp/task.hpp"
18#include "../asrtlpp/util.hpp"
19#include "../asrtr/stream.h"
20#include "./task_unit.hpp"
25template <
typename T >
31 static constexpr auto tag = ASRT_STRM_FIELD_U8;
32 static constexpr auto size = 1;
33 static void encode( uint8_t*& p, uint8_t v ) { *p++ = v; }
39 static constexpr auto tag = ASRT_STRM_FIELD_U16;
40 static constexpr auto size = 2;
41 static void encode( uint8_t*& p, uint16_t v ) { asrt_add_u16( &p, v ); }
47 static constexpr auto tag = ASRT_STRM_FIELD_U32;
48 static constexpr auto size = 4;
49 static void encode( uint8_t*& p, uint32_t v ) { asrt_add_u32( &p, v ); }
55 static constexpr auto tag = ASRT_STRM_FIELD_I8;
56 static constexpr auto size = 1;
57 static void encode( uint8_t*& p, int8_t v ) { *p++ =
static_cast< uint8_t
>( v ); }
63 static constexpr auto tag = ASRT_STRM_FIELD_I16;
64 static constexpr auto size = 2;
65 static void encode( uint8_t*& p, int16_t v )
67 asrt_add_u16( &p,
static_cast< uint16_t
>( v ) );
74 static constexpr auto tag = ASRT_STRM_FIELD_I32;
75 static constexpr auto size = 4;
76 static void encode( uint8_t*& p, int32_t v ) { asrt_add_i32( &p, v ); }
82 static constexpr auto tag = ASRT_STRM_FIELD_FLOAT;
83 static constexpr auto size = 4;
84 static void encode( uint8_t*& p,
float v )
87 std::memcpy( &u, &v, 4 );
88 asrt_add_u32( &p, u );
95 static constexpr auto tag = ASRT_STRM_FIELD_BOOL;
96 static constexpr auto size = 1;
97 static void encode( uint8_t*& p,
bool v ) { *p++ = v ? 1 : 0; }
101ASRT_NODISCARD
inline status
init( ref< asrt_stream_client > client,
asrt_node& prev )
103 return asrt_stream_client_init( client, &prev );
109 ref< asrt_stream_client > client,
111 enum asrt_strm_field_type_e
const* fields,
115 return asrt_stream_client_define(
116 client, schema_id, fields, field_count, done_cb.fn, done_cb.ptr );
120ASRT_NODISCARD
inline status
emit(
121 ref< asrt_stream_client > client,
127 return asrt_stream_client_emit(
128 client, schema_id, data, data_size, done_cb.fn, done_cb.ptr );
132ASRT_NODISCARD
inline status
reset( ref< asrt_stream_client > client )
134 return asrt_stream_client_reset( client );
138inline void deinit( ref< asrt_stream_client > client )
140 asrt_stream_client_deinit( client );
148template <
typename... Ts >
155 ref< asrt_stream_client > client,
159 , _schema_id( schema_id )
161 auto s =
define( _client, _schema_id, fields,
sizeof...( Ts ), done_cb );
162 if ( s != ASRT_SUCCESS ) {
163 ASRT_ERR_LOG(
"asrt_stream_schema",
"define failed" );
164 ASRT_ASSERT(
false );
175 stream_schema( stream_schema
const& ) =
delete;
176 stream_schema& operator=( stream_schema
const& ) =
delete;
178 stream_schema( stream_schema&& o ) noexcept
179 : _client( o._client )
180 , _schema_id( o._schema_id )
188 _schema_id = o._schema_id;
196 uint8_t* p = _emit_buf;
198 return asrt::emit( _client, _schema_id, _emit_buf, emit_size, done_cb );
202 ASRT_NODISCARD status
205 return asrt::emit( _client, _schema_id, buf, emit_size, done_cb );
215 uint8_t _emit_buf[emit_size > 0 ? emit_size : 1];
219template <
typename... Ts >
220struct _stream_define_ctx
222 using completion_signatures = ecor::completion_signatures<
223 ecor::set_value_t( stream_schema< Ts... > ),
224 ecor::set_error_t( status ) >;
229 template <
typename OP >
232 auto cb = +[](
void* ptr,
enum asrt_status s ) {
233 auto& o = *
static_cast< OP*
>( ptr );
234 if ( s == ASRT_SUCCESS )
235 o.receiver.set_value(
236 stream_schema< Ts... >{ o.ctx.client, o.ctx.schema_id } );
238 o.receiver.set_error( s );
243 stream_schema< Ts... >::fields,
246 if ( s != ASRT_SUCCESS )
247 op.receiver.set_error( s );
254template <
typename... Ts >
259template <
typename... Ts >
267template <
typename... Ts >
268struct _stream_emit_ctx
270 using completion_signatures =
271 ecor::completion_signatures< ecor::set_value_t(), ecor::set_error_t( status ) >;
273 stream_schema< Ts... >* schema;
274 uint8_t buf[stream_schema< Ts... >::emit_size];
276 template <
typename OP >
279 auto cb = +[](
void* ptr,
enum asrt_status s ) {
280 auto& o = *
static_cast< OP*
>( ptr );
281 if ( s == ASRT_SUCCESS )
282 o.receiver.set_value();
284 o.receiver.set_error( s );
286 auto st = schema->emit_raw( buf, { cb, &op } );
287 if ( st != ASRT_SUCCESS )
288 op.receiver.set_error( st );
295template <
typename... Ts >
300template <
typename... Ts >
303 _stream_emit_ctx< Ts... > ctx{ &schema, {} };
304 uint8_t* p = ctx.buf;
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee ...
Definition: callback.hpp:17
ASRT_NODISCARD status define(ref< asrt_stream_client > client, uint8_t schema_id, enum asrt_strm_field_type_e const *fields, uint8_t field_count, callback< asrt_stream_done_cb > done_cb)
Send a DEFINE message registering schema_id with the given fields.
Definition: stream.hpp:108
ASRT_NODISCARD status emit(ref< asrt_stream_client > client, uint8_t schema_id, uint8_t const *data, uint16_t data_size, callback< asrt_stream_done_cb > done_cb)
Send one DATA record for schema_id.
Definition: stream.hpp:120
ASRT_NODISCARD status reset(ref< asrt_stream_client > client)
Clear all pending state on the client (e.g. at a test boundary).
Definition: stream.hpp:132
ASRT_NODISCARD status start(ref< asrt_controller > c, callback< asrt_init_callback > cb, uint32_t timeout)
Begin the protocol handshake; cb is invoked once the target responds or the operation times out.
Definition: controller.hpp:36
ASRT_NODISCARD enum asrt_status init(ref< asrt_cntr_assm > assm, asrt_allocator alloc)
Initialise the controller assembly — wires controller, diag, param, collect and stream channels.
Definition: cntr_assm.hpp:25
ecor::sender_from< _stream_emit_ctx< Ts... > > stream_emit_sender
Sender for co_await emit(schema, args...).
Definition: stream.hpp:296
void deinit(ref< asrt_cntr_assm > assm)
Release all resources owned by the assembly.
Definition: cntr_assm.hpp:52
ecor::sender_from< _stream_define_ctx< Ts... > > stream_define_sender
Sender for co_await define<Ts...>(client, schema_id).
Definition: stream.hpp:255
A type-erasing wrapper for C-style callback + void* pairs.
Definition: callback.hpp:27
Compile-time typed stream schema.
Definition: stream.hpp:150
ASRT_NODISCARD status emit(Ts... args, callback< asrt_stream_done_cb > done_cb)
Encode args in-place and send a DATA message using the underlying emit function.
Definition: stream.hpp:194
stream_schema(ref< asrt_stream_client > client, uint8_t schema_id, callback< asrt_stream_done_cb > done_cb)
XXX: error handling in constructor is bad.
Definition: stream.hpp:154
ASRT_NODISCARD status emit_raw(uint8_t const *buf, callback< asrt_stream_done_cb > done_cb)
Emit from a pre-encoded buffer; used internally by stream_emit_sender.
Definition: stream.hpp:203
Definition: stream.hpp:26
A node in a doubly-linked channel chain.
Definition: chann.h:122
Reactor-side stream client (ASRT_STRM channel).
Definition: stream.h:53