cloudevents sdk python开发包
cloudevents的Python项目详细描述
用于CloudEvents
的python sdk注意:此sdk仍被视为正在进行中的工作,每次更新都可能(并且将)中断。
包cloudevents提供用于cloudevents规范的原语:https://github.com/cloudevents/spec。
正在分析来自http请求的上游事件:
importiofromcloudevents.sdk.eventimportv02fromcloudevents.sdkimportmarshallerm=marshaller.NewDefaultHTTPMarshaller()event=m.FromRequest(v02.Event(),{"content-type":"application/cloudevents+json","ce-specversion":"0.2","ce-time":"2018-10-23T12:28:22.4579346Z","ce-id":"96fb5f0b-001e-0108-6dfe-da6e2806f124","ce-source":"<source-url>","ce-type":"word.found.name",},io.BytesIO(b"this is where your CloudEvent data"),lambdax:x.read())
在0.1版中创建最小cloudevent:
fromcloudevents.sdk.eventimportv01event=(v01.Event().SetContentType("application/json").SetData('{"name":"john"}').SetEventID("my-id").SetSource("from-galaxy-far-far-away").SetEventTime("tomorrow").SetEventType("cloudevent.greet.you"))
从cloudevent创建http请求:
fromcloudevents.sdkimportconvertersfromcloudevents.sdkimportmarshallerfromcloudevents.sdk.convertersimportstructuredfromcloudevents.sdk.eventimportv01event=(v01.Event().SetContentType("application/json").SetData('{"name":"john"}').SetEventID("my-id").SetSource("from-galaxy-far-far-away").SetEventTime("tomorrow").SetEventType("cloudevent.greet.you"))m=marshaller.NewHTTPMarshaller([structured.NewJSONHTTPCloudEventConverter()])headers,body=m.ToRequest(event,converters.TypeStructured,lambdax:x)
如何使用各种python http框架
在本主题中,您将发现如何将sdk与各种http框架集成的各种示例。
python请求
流行的框架之一是0.2-force-improvements。
要请求的CloudEvent
下面的代码显示了如何集成两个库,以便将cloudevent转换为http请求:
defrun_binary(event,url):binary_headers,binary_data=http_marshaller.ToRequest(event,converters.TypeBinary,json.dumps)print("binary CloudEvent")fork,vinbinary_headers.items():print("{0}: {1}\r\n".format(k,v))print(binary_data.getvalue())response=requests.post(url,headers=binary_headers,data=binary_data.getvalue())response.raise_for_status()defrun_structured(event,url):structured_headers,structured_data=http_marshaller.ToRequest(event,converters.TypeStructured,json.dumps)print("structured CloudEvent")print(structured_data.getvalue())response=requests.post(url,headers=structured_headers,data=structured_data.getvalue())response.raise_for_status()
将cloudevent转换为可以找到here的请求的完整示例。
请求CloudEvent
下面的代码显示了如何集成两个库以便从http请求创建cloudevent:
response=requests.get(url)response.raise_for_status()headers=response.headersdata=io.BytesIO(response.content)event=v02.Event()http_marshaller=marshaller.NewDefaultHTTPMarshaller()event=http_marshaller.FromRequest(event,headers,data,json.load)
将cloudevent转换为可以找到here的请求的完整示例。
sdk版本控制
这个包的目标是为所有发布的cloudevents版本提供支持,理想情况下,同时维护 相同的api。它将使用语义版本控制,规则如下:
- 当引入向后不兼容的更改时,主版本会增加。
- 当引入向后兼容功能时,小版本会增加,包括支持新的cloudevents版本。
- 当引入向后兼容的错误修复时,补丁版本会增加。