cloudevents sdk python开发包

cloudevents的Python项目详细描述


用于CloudEvents

的python sdk

注意:此sdk仍被视为正在进行中的工作,每次更新都可能(并且将)中断。

cloudevents提供用于cloudevents规范的原语:https://github.com/cloudevents/spec

正在分析来自http请求的上游事件:

importiofromcloudevents.sdk.eventimportv02fromcloudevents.sdkimportmarshallerm=marshaller.NewDefaultHTTPMarshaller()event=m.FromRequest(v02.Event(),{"content-type":"application/cloudevents+json","ce-specversion":"0.2","ce-time":"2018-10-23T12:28:22.4579346Z","ce-id":"96fb5f0b-001e-0108-6dfe-da6e2806f124","ce-source":"<source-url>","ce-type":"word.found.name",},io.BytesIO(b"this is where your CloudEvent data"),lambdax:x.read())

在0.1版中创建最小cloudevent:

fromcloudevents.sdk.eventimportv01event=(v01.Event().SetContentType("application/json").SetData('{"name":"john"}').SetEventID("my-id").SetSource("from-galaxy-far-far-away").SetEventTime("tomorrow").SetEventType("cloudevent.greet.you"))

从cloudevent创建http请求:

fromcloudevents.sdkimportconvertersfromcloudevents.sdkimportmarshallerfromcloudevents.sdk.convertersimportstructuredfromcloudevents.sdk.eventimportv01event=(v01.Event().SetContentType("application/json").SetData('{"name":"john"}').SetEventID("my-id").SetSource("from-galaxy-far-far-away").SetEventTime("tomorrow").SetEventType("cloudevent.greet.you"))m=marshaller.NewHTTPMarshaller([structured.NewJSONHTTPCloudEventConverter()])headers,body=m.ToRequest(event,converters.TypeStructured,lambdax:x)

如何使用各种python http框架

在本主题中,您将发现如何将sdk与各种http框架集成的各种示例。

python请求

流行的框架之一是0.2-force-improvements

要请求的CloudEvent

下面的代码显示了如何集成两个库,以便将cloudevent转换为http请求:

defrun_binary(event,url):binary_headers,binary_data=http_marshaller.ToRequest(event,converters.TypeBinary,json.dumps)print("binary CloudEvent")fork,vinbinary_headers.items():print("{0}: {1}\r\n".format(k,v))print(binary_data.getvalue())response=requests.post(url,headers=binary_headers,data=binary_data.getvalue())response.raise_for_status()defrun_structured(event,url):structured_headers,structured_data=http_marshaller.ToRequest(event,converters.TypeStructured,json.dumps)print("structured CloudEvent")print(structured_data.getvalue())response=requests.post(url,headers=structured_headers,data=structured_data.getvalue())response.raise_for_status()

将cloudevent转换为可以找到here的请求的完整示例。

请求CloudEvent

下面的代码显示了如何集成两个库以便从http请求创建cloudevent:

response=requests.get(url)response.raise_for_status()headers=response.headersdata=io.BytesIO(response.content)event=v02.Event()http_marshaller=marshaller.NewDefaultHTTPMarshaller()event=http_marshaller.FromRequest(event,headers,data,json.load)

将cloudevent转换为可以找到here的请求的完整示例。

sdk版本控制

这个包的目标是为所有发布的cloudevents版本提供支持,理想情况下,同时维护 相同的api。它将使用语义版本控制,规则如下:

  • 当引入向后不兼容的更改时,主版本会增加。
  • 当引入向后兼容功能时,小版本会增加,包括支持新的cloudevents版本。
  • 当引入向后兼容的错误修复时,补丁版本会增加。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
javascript通过WebSocket将服务器时间推送到多个客户端   这种java字节转换是如何工作的   数据库错误。sql。SQLEception:参数索引超出范围(1>参数数,为0)   java如何在搜索时过滤选定的viewpager片段中的recyclerview?   java如何使用OpenCV将图像转换为黑白图像并在ANDROID中消除阴影   Spring MVC项目中的java HTTP服务器状态404错误   Spring MVC中的java JSR303自定义约束验证器   java如何基于另一个ArrayList的值显示ArrayList中的特定项?   java如何在firebase messages节点获取最后发送给我的消息?   部署后在google app engine上运行servlet时发生java错误   java如何使用servlet在两个jsp之间发送数据   java日历年中的周是月中的周   从URL读取一个资源并直接返回这些字节作为REST请求的响应,Java 7和spring MVC 3.2不存储内存