如何使用python和静态创建的主题类创建OpenSplice DDS主题?

2024-06-25 23:06:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直试图在Ubuntu20.04.2LTS上使用ADLINK的Vortex OpenSplice Community edition和Python API(PyEnv虚拟环境中的Python版本3.6)。我遵循了PythonDCPSAPIGuide,并在($OSPL_HOME/tools/python/examples)中使用了python示例。但是,我不知道如何使用idlpp为静态生成的主题类创建与域参与者关联的主题。我怎样才能做到这一点

到目前为止我所做的:

我有一个IDL文件,其中包含许多其他IDL文件的路径。我已使用以下bash脚本将这些IDL文件转换为python主题类:

#!/bin/bash

for FILE in *.idl; do
  $OSPL_HOME/bin/idlpp -I $OSPL_HOME/etc/idl -S -l python -d . $FILE
done

这将创建一系列python包(python主题类),我将它们导入到同一目录中的python脚本中

使用这些包,我想在我的python脚本中创建一个主题,或者向域参与者注册一个主题。例如,类似以下python代码的代码(但是“create_topic”函数不存在):

# myExampleDDSFile.py

from dds import *
from foo import foo_type # idlpp generated module/class
from foo2 import foo_type2 # idlpp generated module/class

dp = DomainParticipant()
topic = dp.create_topic('foo_topic',foo_type) # this function doesn't exist for a domain participant
pub = dp.create_publisher()

这可能吗?如果可能的话,我如何才能注册我在python中与域参与者静态创建的主题


我注意到在提供的python示例(例如$OSPL_HOME/tools/python/examples/example1.py)中,使用以下代码动态注册了一个主题,但我认为这与静态生成的python主题类无关:

# example1.py snippet

dp = DomainParticipant()
gen_info = ddsutil.get_dds_classes_from_idl('example1.idl', 'basic::module_SequenceOfStruct::SequenceOfStruct_struct')
topic = gen_info.register_topic(dp, 'Example1')

我在source code中也看不到相关函数

如果这是一个简单的问题,或者我遗漏了什么,我深表歉意-我对Vortex OpenSplice DDS非常陌生

任何帮助都将不胜感激


Tags: 文件代码from脚本home主题topicfoo
1条回答
网友
1楼 · 发布于 2024-06-25 23:06:48

我无法与OpenSplice通话,但您可以使用CoreDX DDS进行通话。例如,给定IDL文件“hello.IDL”:

struct StringMsg
{
   string msg;
};

coredx_ddl -l python -f hello.idl -d hello

下面的python是如何使用生成的“StringMsg”类型构造主题和数据读取器的示例:

import time
import dds.domain
from  hello import StringMsg

# Use default QoS, and no listener
dp    = dds.domain.DomainParticipant( 0 )
topic = dds.topic.Topic( StringMsg, "helloTopic", "StringMsg", dp )
sub   = dds.sub.Subscriber( dp )
dr    = dds.sub.DataReader( sub, topic )
rc    = dr.create_readcondition( dds.core.SampleState.ANY_SAMPLE_STATE,
                                 dds.core.ViewState.ANY_VIEW_STATE,
                                 dds.core.InstanceState.ANY_INSTANCE_STATE )
ws    = dds.cond.WaitSet()
ws.attach_condition(rc)

while True:
    t = dds.core.Duration.infinite()
    print('waiting for data...')
    ws.wait(t)
    while True:
        try:
            samples = dr.take( )
            for s in samples:
                if s.info.valid_data:
                    print("received: {}".format(s.data.msg))
        except dds.core.Error:
            break;

相关问题 更多 >