Hi,
Is there any documentation on how to connect to Bloomberg API using a server-side API?
I have the code for it but don’t know how to use the functions to get the data.
import getpass
kwargs = {
"user_id": f"tdsecurities/{getpass.getuser()}",
"ip_address": "49.23.74.146", # Inset IP addresss here
"app_name": "td:Notebooks",
"server_host": "69.184.252.116",
"server_port": 8194
}
import blpapi
def create_session(**kwargs):
user = blpapi.AuthUser.createWithManualOptions(userId=kwargs['user_id'], ipAddress=kwargs['ip_address'])
auth = blpapi.AuthOptions.createWithUserAndApp(user=user, appName=kwargs['app_name'])
sess_opts = blpapi.SessionOptions()
sess_opts.setSessionIdentityOptions(authOptions=auth)
sess_opts.setServerHost(serverHost=kwargs['server_host'])
sess_opts.setServerPort(serverPort=kwargs['server_port'])
return blpapi.Session(sess_opts)
session = create_session(**kwargs)
if session.start():
print("You have started a session")
else:
print("You haven't started a session")
APIFLDS_SVC = "//blp/apiflds"
CATEGORIZED_FIELD_SEARCH_REQUEST = "CategorizedFieldSearchRequest"
FIELD_INFO_REQUEST = "FieldInfoRequest"
FIELD_LIST_REQUEST = "FieldListRequest"
FIELD_SEARCH_REQUEST = "FieldSearchRequest"
ID_LEN = 13
MNEMONIC_LEN = 36
DESC_LEN = 40
CAT_NAME_LEN = 40
def printHeader():
"""Prints the BBG header data in a readable format."""
print(
f'{"FIELD ID".ljust(ID_LEN)}'
f'{"MNEMONIC".ljust(MNEMONIC_LEN)}'
f'{"DESCRIPTION".ljust(DESC_LEN)}'
)
print(
f'{"-----------".ljust(ID_LEN)}'
f'{"-----------".ljust(MNEMONIC_LEN)}'
f'{"-----------".ljust(DESC_LEN)}'
)
def printField(field):
"""Prints the BBG field data in a readable format."""
fieldId = field["id"]
if "fieldInfo" in field:
fieldInfo = field["fieldInfo"]
fieldMnemonic = fieldInfo["mnemonic"]
fieldDesc = fieldInfo["description"]
print(
f"{fieldId.ljust(ID_LEN)}"
f"{fieldMnemonic.ljust(MNEMONIC_LEN)}"
f"{fieldDesc.ljust(DESC_LEN)}"
)
else:
errorMsg = field["fieldError"]["message"]
print()
print(f" ERROR: {fieldId} - {errorMsg}")
def _createRequest(apifldsService):
request = apifldsService.createRequest("CategorizedFieldSearchRequest")
request.set("searchSpec", "last price")
exclude = request.getElement("exclude")
exclude.setElement("fieldType", "Static")
request.set("returnFieldDocumentation", False)
return request
def _processResponse(event):
for msg in event:
if "fieldSearchError" in msg:
print(msg)
continue
categories = msg["category"]
for category in categories:
category_name = category["categoryName"]
category_id = category["categoryId"]
print(
f"\nCategory Name: {category_name.ljust(40)}"
f"\tId: {category_id}"
)
printHeader()
fields = category["fieldData"]
for field in fields:
printField(field)
break
break
break
def createRequest(requestType, session):
"""Sends a request based on the request type."""
apifldsService = session.getService(APIFLDS_SVC)
if requestType == CATEGORIZED_FIELD_SEARCH_REQUEST:
return _createRequest(apifldsService)
def processResponse(requestType, event):
"""Processes a response to the request."""
if requestType == CATEGORIZED_FIELD_SEARCH_REQUEST:
return _processResponse(event)
def main():
session = create_session(**kwargs)
try:
if not session.start():
print("Failed to start session.")
return
if not session.openService(APIFLDS_SVC):
print(f"Failed to open {APIFLDS_SVC}.")
return
request = createRequest(CATEGORIZED_FIELD_SEARCH_REQUEST, session)
print(f"Sending Request: {request}")
session.sendRequest(request)
done = False
while not done:
event = session.nextEvent()
eventType = event.eventType()
if eventType == blpapi.Event.REQUEST_STATUS:
for msg in event:
if msg.messageType() == blpapi.Names.REQUEST_FAILURE:
# Request has failed, exit
print(msg)
done = True
break
elif eventType in [
blpapi.Event.RESPONSE,
blpapi.Event.PARTIAL_RESPONSE,
]:
processResponse(CATEGORIZED_FIELD_SEARCH_REQUEST, event)
# Received the final response, no further response events are
# expected.
if eventType == blpapi.Event.RESPONSE:
done = True
finally:
session.stop()
if __name__ == "__main__":
try:
main()
except Exception as e:
print(e)